#!/usr/bin/env python3
-from dataclasses import dataclass
import datetime
import json
import logging
import os
-from typing import Any, List
import urllib.request
+from dataclasses import dataclass
+from typing import Any, List
from overrides import overrides
type=str,
default=f'{os.environ["HOME"]}/cache/.weather_summary_cache',
metavar='FILENAME',
- help='File in which to cache weather data'
+ help='File in which to cache weather data',
)
cfg.add_argument(
'--weather_data_stalest_acceptable',
type=argparse_utils.valid_duration,
- default=datetime.timedelta(seconds=7200), # 2 hours
+ default=datetime.timedelta(seconds=7200), # 2 hours
metavar='DURATION',
- help='Maximum acceptable age of cached data. If zero, forces a refetch'
+ help='Maximum acceptable age of cached data. If zero, forces a refetch',
)
@dataclass
class WeatherData:
- date: datetime.date # The date
- high: float # The predicted high in F
- low: float # The predicted low in F
- precipitation_inches: float # Number of inches of precipitation / day
- conditions: List[str] # Conditions per ~3h window
- most_common_condition: str # The most common condition
- icon: str # An icon to represent it
+ date: datetime.date # The date
+ high: float # The predicted high in F
+ low: float # The predicted low in F
+ precipitation_inches: float # Number of inches of precipitation / day
+ conditions: List[str] # Conditions per ~3h window
+ most_common_condition: str # The most common condition
+ icon: str # An icon to represent it
-@persistent.persistent_autoloaded_singleton()
+@persistent.persistent_autoloaded_singleton() # type: ignore
class CachedWeatherData(persistent.Persistent):
- def __init__(self,
- weather_data = None):
+ def __init__(self, weather_data=None):
if weather_data is not None:
self.weather_data = weather_data
return
"Sand": "🏜️",
"Ash": "🌋",
"Squall": "🌬",
- "Tornado": "🌪️"
+ "Tornado": "🌪️",
}
now = datetime.datetime.now()
dates = set()
lows = {}
conditions = {}
precip = {}
- param = "id=5786882" # Bellevue, WA
+ param = "id=5786882" # Bellevue, WA
key = "c0b160c49743622f62a9cd3cda0270b3"
www = urllib.request.urlopen(
f'http://api.openweathermap.org/data/2.5/weather?zip=98005,us&APPID={key}&units=imperial'
if dt == now.date() and now.hour > 18 and condition == 'Clear':
icon = '🌙'
self.weather_data[dt] = WeatherData(
- date = dt,
- high = float(parsed_json["main"]["temp_max"]),
- low = float(parsed_json["main"]["temp_min"]),
- precipitation_inches = p / 25.4,
- conditions = [condition],
- most_common_condition = condition,
- icon = icon,
+ date=dt,
+ high=float(parsed_json["main"]["temp_max"]),
+ low=float(parsed_json["main"]["temp_min"]),
+ precipitation_inches=p / 25.4,
+ conditions=[condition],
+ most_common_condition=condition,
+ icon=icon,
)
www = urllib.request.urlopen(
lows[dt] = None
conditions[dt] = []
for temp in (
- data["main"]["temp"],
- data['main']['temp_min'],
- data['main']['temp_max'],
+ data["main"]["temp"],
+ data['main']['temp_min'],
+ data['main']['temp_max'],
):
if highs[dt] is None or temp > highs[dt]:
highs[dt] = temp
for dt in sorted(dates):
if dt == today:
high = highs.get(dt, None)
- if (
- high is not None and
- self.weather_data[today].high < high
- ):
+ if high is not None and self.weather_data[today].high < high:
self.weather_data[today].high = high
continue
most_common_condition = list_utils.most_common(conditions[dt])
if dt == now.date() and now.hour > 18 and condition == 'Clear':
icon = '🌙'
self.weather_data[dt] = WeatherData(
- date = dt,
- high = highs[dt],
- low = lows[dt],
- precipitation_inches = precip[dt] / 25.4,
- conditions = conditions[dt],
- most_common_condition = most_common_condition,
- icon = icon
+ date=dt,
+ high=highs[dt],
+ low=lows[dt],
+ precipitation_inches=precip[dt] / 25.4,
+ conditions=conditions[dt],
+ most_common_condition=most_common_condition,
+ icon=icon,
)
@classmethod
@overrides
def load(cls) -> Any:
if persistent.was_file_written_within_n_seconds(
- config.config['weather_data_cachefile'],
- config.config['weather_data_stalest_acceptable'].total_seconds(),
+ config.config['weather_data_cachefile'],
+ config.config['weather_data_stalest_acceptable'].total_seconds(),
):
import pickle
+
with open(config.config['weather_data_cachefile'], 'rb') as rf:
weather_data = pickle.load(rf)
return cls(weather_data)
@overrides
def save(self) -> bool:
import pickle
+
with open(config.config['weather_data_cachefile'], 'wb') as wf:
pickle.dump(
self.weather_data,
#!/usr/bin/env python3
-from dataclasses import dataclass
import datetime
import logging
import os
-from typing import Any
import urllib.request
+from dataclasses import dataclass
+from typing import Any
import astral # type: ignore
+import pytz
from astral.sun import sun # type: ignore
from bs4 import BeautifulSoup # type: ignore
from overrides import overrides
-import pytz
import argparse_utils
import config
-import datetime_utils
import dateparse.dateparse_utils as dp
+import datetime_utils
import persistent
-import text_utils
import smart_home.thermometers as temps
-
+import text_utils
logger = logging.getLogger(__name__)
cfg = config.add_commandline_args(
f'Cached Weather Forecast ({__file__})',
- 'Arguments controlling detailed weather rendering'
+ 'Arguments controlling detailed weather rendering',
)
cfg.add_argument(
'--weather_forecast_cachefile',
type=str,
default=f'{os.environ["HOME"]}/cache/.weather_forecast_cache',
metavar='FILENAME',
- help='File in which to cache weather data'
+ help='File in which to cache weather data',
)
cfg.add_argument(
'--weather_forecast_stalest_acceptable',
type=argparse_utils.valid_duration,
- default=datetime.timedelta(seconds=7200), # 2 hours
+ default=datetime.timedelta(seconds=7200), # 2 hours
metavar='DURATION',
- help='Maximum acceptable age of cached data. If zero, forces a refetch'
+ help='Maximum acceptable age of cached data. If zero, forces a refetch',
)
@dataclass
class WeatherForecast:
- date: datetime.date # The date
- sunrise: datetime.datetime # Sunrise datetime
- sunset: datetime.datetime # Sunset datetime
- description: str # Textual description of weather
+ date: datetime.date # The date
+ sunrise: datetime.datetime # Sunrise datetime
+ sunset: datetime.datetime # Sunset datetime
+ description: str # Textual description of weather
-@persistent.persistent_autoloaded_singleton()
+@persistent.persistent_autoloaded_singleton() # type: ignore
class CachedDetailedWeatherForecast(persistent.Persistent):
- def __init__(self, forecasts = None):
+ def __init__(self, forecasts=None):
if forecasts is not None:
self.forecasts = forecasts
return
last_dt = now
dt = now
for (day, txt) in zip(
- forecast.find_all('b'),
- forecast.find_all(class_='col-sm-10 forecast-text')
+ forecast.find_all('b'), forecast.find_all(class_='col-sm-10 forecast-text')
):
last_dt = dt
try:
self.forecasts[dt.date()].description += '\n' + blurb
else:
self.forecasts[dt.date()] = WeatherForecast(
- date = dt,
- sunrise = sunrise,
- sunset = sunset,
- description = blurb,
+ date=dt,
+ sunrise=sunrise,
+ sunset=sunset,
+ description=blurb,
)
@classmethod
@overrides
def load(cls) -> Any:
if persistent.was_file_written_within_n_seconds(
- config.config['weather_forecast_cachefile'],
- config.config['weather_forecast_stalest_acceptable'].total_seconds(),
+ config.config['weather_forecast_cachefile'],
+ config.config['weather_forecast_stalest_acceptable'].total_seconds(),
):
import pickle
+
with open(config.config['weather_forecast_cachefile'], 'rb') as rf:
weather_data = pickle.load(rf)
return cls(weather_data)
@overrides
def save(self) -> bool:
import pickle
+
with open(config.config['weather_forecast_cachefile'], 'wb') as wf:
pickle.dump(
self.forecasts,
#!/usr/bin/env python3
-from typing import Any, Optional, List
+from typing import Any, Generator, List, Optional
class Node(object):
Note: value can be anything as long as it is comparable.
Check out @functools.total_ordering.
"""
- self.left = None
- self.right = None
+ self.left: Optional[Node] = None
+ self.right: Optional[Node] = None
self.value = value
"""Find helper"""
if value == node.value:
return node
- elif (value < node.value and node.left is not None):
+ elif value < node.value and node.left is not None:
return self._find(value, node.left)
- elif (value > node.value and node.right is not None):
+ elif value > node.value and node.right is not None:
return self._find(value, node.right)
return None
- def _parent_path(self, current: Node, target: Node):
+ def _parent_path(
+ self, current: Optional[Node], target: Node
+ ) -> List[Optional[Node]]:
if current is None:
return [None]
- ret = [current]
+ ret: List[Optional[Node]] = [current]
if target.value == current.value:
return ret
elif target.value < current.value:
ret.extend(self._parent_path(current.right, target))
return ret
- def parent_path(self, node: Node) -> Optional[List[Node]]:
+ def parent_path(self, node: Node) -> List[Optional[Node]]:
"""Return a list of nodes representing the path from
the tree's root to the node argument. If the node does
not exist in the tree for some reason, the last element
if node.right is not None:
yield from self._iterate_by_depth(node.right, depth - 1)
- def iterate_nodes_by_depth(self, depth: int):
+ def iterate_nodes_by_depth(self, depth: int) -> Generator[Node, None, None]:
"""
Iterate only the leaf nodes in the tree.
return x
path = self.parent_path(node)
+ assert path[-1]
assert path[-1] == node
path = path[:-1]
path.reverse()
for ancestor in path:
+ assert ancestor
if node != ancestor.right:
return ancestor
node = ancestor
+ raise Exception()
def _depth(self, node: Node, sofar: int) -> int:
depth_left = sofar + 1
def height(self):
return self.depth()
- def repr_traverse(self, padding: str, pointer: str, node: Node, has_right_sibling: bool) -> str:
+ def repr_traverse(
+ self, padding: str, pointer: str, node: Optional[Node], has_right_sibling: bool
+ ) -> str:
if node is not None:
viz = f'\n{padding}{pointer}{node.value}'
if has_right_sibling:
else:
pointer_left = "└──"
- viz += self.repr_traverse(padding, pointer_left, node.left, node.right is not None)
+ viz += self.repr_traverse(
+ padding, pointer_left, node.left, node.right is not None
+ )
viz += self.repr_traverse(padding, pointer_right, node.right, False)
return viz
return ""
else:
pointer_left = "├──"
- ret += self.repr_traverse('', pointer_left, self.root.left, self.root.left is not None)
+ ret += self.repr_traverse(
+ '', pointer_left, self.root.left, self.root.left is not None
+ )
ret += self.repr_traverse('', pointer_right, self.root.right, False)
return ret
if __name__ == '__main__':
import doctest
+
doctest.testmod()
import pickle
from contextlib import contextmanager
from functools import wraps
-from multiprocessing import shared_memory, RLock
+from multiprocessing import RLock, shared_memory
from typing import (
Any,
Dict,
Generator,
- KeysView,
ItemsView,
Iterator,
+ KeysView,
Optional,
ValuesView,
)
import datetime
import functools
-import holidays # type: ignore
import logging
import re
import sys
import antlr4 # type: ignore
import dateutil.easter
import dateutil.tz
+import holidays # type: ignore
import pytz
import acl
import bootstrap
+import decorator_utils
+from dateparse.dateparse_utilsLexer import dateparse_utilsLexer # type: ignore
+from dateparse.dateparse_utilsListener import dateparse_utilsListener # type: ignore
+from dateparse.dateparse_utilsParser import dateparse_utilsParser # type: ignore
from datetime_utils import (
TimeUnit,
- n_timeunits_from_base,
- datetime_to_date,
date_to_datetime,
+ datetime_to_date,
+ n_timeunits_from_base,
)
-from dateparse.dateparse_utilsLexer import dateparse_utilsLexer # type: ignore
-from dateparse.dateparse_utilsListener import dateparse_utilsListener # type: ignore
-from dateparse.dateparse_utilsParser import dateparse_utilsParser # type: ignore
-import decorator_utils
-
logger = logging.getLogger(__name__)
to timezone naive (i.e. tzinfo = None).
"""
dt = self.datetime
- if tz is not None:
- dt = dt.replace(tzinfo=None).astimezone(tz=tz)
+ if dt is not None:
+ if tz is not None:
+ dt = dt.replace(tzinfo=None).astimezone(tz=tz)
return dt
# -- helpers --
# Try pytz
try:
- tz = pytz.timezone(txt)
- if tz is not None:
- return tz
+ tz1 = pytz.timezone(txt)
+ if tz1 is not None:
+ return tz1
except Exception:
pass
# Try dateutil
try:
- tz = dateutil.tz.gettz(txt)
- if tz is not None:
- return tz
+ tz2 = dateutil.tz.gettz(txt)
+ if tz2 is not None:
+ return tz2
except Exception:
pass
# Try constructing an offset in seconds
try:
- sign = txt[0]
- if sign == '-' or sign == '+':
- sign = +1 if sign == '+' else -1
+ txt_sign = txt[0]
+ if txt_sign == '-' or txt_sign == '+':
+ sign = +1 if txt_sign == '+' else -1
hour = int(txt[1:3])
minute = int(txt[-2:])
offset = sign * (hour * 60 * 60) + sign * (minute * 60)
# Apply resudual adjustments to times here when we have a
# datetime.
self.datetime = self.datetime + self.timedelta
+ assert self.datetime
self.time = datetime.time(
self.datetime.hour,
self.datetime.minute,
elif unit == TimeUnit.HOURS:
self.timedelta = datetime.timedelta(hours=count)
else:
- raise ParseException()
+ raise ParseException(f'Invalid Unit: "{unit}"')
def exitDeltaPlusMinusExpr(
self, ctx: dateparse_utilsParser.DeltaPlusMinusExprContext
logger.exception(e)
print("Unrecognized.")
else:
+ assert dt
print(dt.strftime('%A %Y/%m/%d %H:%M:%S.%f %Z(%z)'))
sys.exit(0)
return datetime_to_date_and_time(dt)[1]
-class TimeUnit(enum.Enum):
+class TimeUnit(enum.IntEnum):
"""An enum to represent units with which we can compute deltas."""
MONDAYS = 0
from __future__ import annotations
-from abc import ABC, abstractmethod
import datetime
import glob
import logging
import pickle
import random
import sys
+import warnings
+from abc import ABC, abstractmethod
from types import SimpleNamespace
from typing import Any, List, NamedTuple, Optional, Set, Tuple
-import warnings
import numpy as np
from sklearn.model_selection import train_test_split # type:ignore
from sklearn.preprocessing import MinMaxScaler # type: ignore
-from ansi import bold, reset
import argparse_utils
import config
-from decorator_utils import timed
import executors
import parallelize as par
+from ansi import bold, reset
+from decorator_utils import timed
logger = logging.getLogger(__file__)
model_filename: Optional[str]
model_info_filename: Optional[str]
scaler_filename: Optional[str]
- training_score: float
- test_score: float
+ training_score: np.float64
+ test_score: np.float64
class TrainingBlueprint(ABC):
modelid_to_params[model.get_id()] = str(params)
best_model = None
- best_score = None
- best_test_score = None
- best_training_score = None
+ best_score: Optional[np.float64] = None
+ best_test_score: Optional[np.float64] = None
+ best_training_score: Optional[np.float64] = None
best_params = None
for model in smart_future.wait_any(models):
params = modelid_to_params[model.get_id()]
print(msg)
logger.info(msg)
+ assert best_training_score
+ assert best_test_score
+ assert best_params
(
scaler_filename,
model_filename,
and input_utils.yn_response("Write the model? [y,n]: ") == "y"
):
scaler_filename = f"{self.spec.basename}_scaler.sav"
- with open(scaler_filename, "wb") as f:
- pickle.dump(scaler, f)
+ with open(scaler_filename, "wb") as fb:
+ pickle.dump(scaler, fb)
msg = f"Wrote {scaler_filename}"
print(msg)
logger.info(msg)
model_filename = f"{self.spec.basename}_model.sav"
- with open(model_filename, "wb") as f:
- pickle.dump(model, f)
+ with open(model_filename, "wb") as fb:
+ pickle.dump(model, fb)
msg = f"Wrote {model_filename}"
print(msg)
logger.info(msg)
import datetime
import logging
import threading
+from typing import Any, List
import pychromecast
-from decorator_utils import memoized
import smart_home.device as dev
+from decorator_utils import memoized
logger = logging.getLogger(__name__)
class BaseChromecast(dev.Device):
- ccasts = []
+ ccasts: List[Any] = []
refresh_ts = None
browser = None
lock = threading.Lock()
super().__init__(name.strip(), mac.strip(), keywords)
ip = self.get_ip()
now = datetime.datetime.now()
- with BaseChromecast.lock as l:
+ with BaseChromecast.lock:
if (
- BaseChromecast.refresh_ts is None
- or (now - BaseChromecast.refresh_ts).total_seconds() > 60
+ BaseChromecast.refresh_ts is None
+ or (now - BaseChromecast.refresh_ts).total_seconds() > 60
):
logger.debug('Refreshing the shared chromecast info list')
if BaseChromecast.browser is not None:
BaseChromecast.browser.stop_discovery()
- BaseChromecast.ccasts, BaseChromecast.browser = pychromecast.get_chromecasts(
- timeout=15.0
- )
+ (
+ BaseChromecast.ccasts,
+ BaseChromecast.browser,
+ ) = pychromecast.get_chromecasts(timeout=15.0)
+ assert BaseChromecast.browser
atexit.register(BaseChromecast.browser.stop_discovery)
BaseChromecast.refresh_ts = now
self.cast = None
for cc in BaseChromecast.ccasts:
- if (
- cc.cast_info.host == ip
- and cc.cast_info.cast_type != 'group'
- ):
+ if cc.cast_info.host == ip and cc.cast_info.cast_type != 'group':
logger.debug(f'Found chromecast at {ip}: {cc}')
self.cast = cc
self.cast.wait(timeout=1.0)
if self.cast is None:
- raise Exception(f'Can\'t find ccast device at {ip}, is that really a ccast device?')
+ raise Exception(
+ f'Can\'t find ccast device at {ip}, is that really a ccast device?'
+ )
def is_idle(self):
return self.cast.is_idle
f"Chromecast({self.cast.socket_client.host!r}, port={self.cast.socket_client.port!r}, "
f"device={self.cast.cast_info.friendly_name!r})"
)
-
class Device(object):
def __init__(
- self,
- name: str,
- mac: str,
- keywords: Optional[List[str]],
+ self,
+ name: str,
+ mac: str,
+ keywords: Optional[List[str]],
):
self.name = name
self.mac = mac
self.keywords = keywords
self.arper = arper.Arper()
if keywords is not None:
- self.kws = keywords.split()
+ self.kws = keywords
else:
self.kws = []
"""Utilities for dealing with the smart lights."""
-from abc import abstractmethod
import datetime
import json
import logging
import re
import subprocess
import sys
+from abc import abstractmethod
from typing import Any, Dict, List, Optional, Tuple
-from overrides import overrides
import tinytuya as tt
+from overrides import overrides
import ansi
import argparse_utils
import config
import logging_utils
import smart_home.device as dev
-from google_assistant import ask_google, GoogleResponse
-from decorator_utils import timeout, memoized
+from decorator_utils import memoized, timeout
+from google_assistant import GoogleResponse, ask_google
logger = logging.getLogger(__name__)
)
-@timeout(
- 5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
-)
+@timeout(5.0, use_signals=False, error_message="Timed out waiting for tplink.py")
def tplink_light_command(command: str) -> bool:
result = os.system(command)
signal = result & 0xFF
def parse_color_string(color: str) -> Optional[Tuple[int, int, int]]:
m = re.match(
'r#?([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])([0-9a-fA-F][0-9a-fA-F])',
- color
+ color,
)
- if m is not None and len(m.group) == 3:
+ if m is not None and len(m.groups()) == 3:
red = int(m.group(0), 16)
green = int(m.group(1), 16)
blue = int(m.group(2), 16)
r = ask_google(f"is {self.goog_name()} on?")
if not r.success:
return False
- return 'is on' in r.audio_transcription
+ if r.audio_transcription is not None:
+ return 'is on' in r.audio_transcription
+ raise Exception("Can't reach Google?!")
@overrides
def is_off(self) -> bool:
# the bookcase one is set to 40% bright
txt = r.audio_transcription
- m = re.search(r"(\d+)% bright", txt)
- if m is not None:
- return int(m.group(1))
- if "is off" in txt:
- return 0
+ if txt is not None:
+ m = re.search(r"(\d+)% bright", txt)
+ if m is not None:
+ return int(m.group(1))
+ if "is off" in txt:
+ return 0
return None
@overrides
def get_children(self) -> List[str]:
return self.children
- def command(
- self, cmd: str, child: str = None, extra_args: str = None
- ) -> bool:
+ def command(self, cmd: str, child: str = None, extra_args: str = None) -> bool:
cmd = self.get_cmdline(child) + f"-c {cmd}"
if extra_args is not None:
cmd += f" {extra_args}"
def make_color(self, color: str) -> bool:
raise NotImplementedError
- @timeout(
- 10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
- )
+ @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
def get_info(self) -> Optional[Dict]:
cmd = self.get_cmdline() + "-c info"
out = subprocess.getoutput(cmd)
"""Utilities for dealing with the smart outlets."""
-from abc import abstractmethod
import asyncio
import atexit
import datetime
import re
import subprocess
import sys
+from abc import abstractmethod
from typing import Any, Dict, List, Optional
from meross_iot.http_api import MerossHttpClient
from meross_iot.manager import MerossManager
+from overrides import overrides
import argparse_utils
import config
import logging_utils
import scott_secrets
import smart_home.device as dev
-from google_assistant import ask_google, GoogleResponse
-from decorator_utils import timeout, memoized
+from decorator_utils import memoized, timeout
+from google_assistant import GoogleResponse, ask_google
logger = logging.getLogger(__name__)
)
-@timeout(
- 5.0, use_signals=False, error_message="Timed out waiting for tplink.py"
-)
+@timeout(5.0, use_signals=False, error_message="Timed out waiting for tplink.py")
def tplink_outlet_command(command: str) -> bool:
result = os.system(command)
signal = result & 0xFF
class BaseOutlet(dev.Device):
def __init__(self, name: str, mac: str, keywords: str = "") -> None:
super().__init__(name.strip(), mac.strip(), keywords)
- self.info = None
@abstractmethod
def turn_on(self) -> bool:
)
return cmd
- def command(self, cmd: str, extra_args: str = None) -> bool:
+ def command(self, cmd: str, extra_args: str = None, **kwargs) -> bool:
cmd = self.get_cmdline() + f"-c {cmd}"
if extra_args is not None:
cmd += f" {extra_args}"
return tplink_outlet_command(cmd)
+ @overrides
def turn_on(self) -> bool:
return self.command('on')
+ @overrides
def turn_off(self) -> bool:
return self.command('off')
+ @overrides
def is_on(self) -> bool:
return self.get_on_duration_seconds() > 0
+ @overrides
def is_off(self) -> bool:
return not self.is_on()
- @timeout(
- 10.0, use_signals=False, error_message="Timed out waiting for tplink.py"
- )
+ @timeout(10.0, use_signals=False, error_message="Timed out waiting for tplink.py")
def get_info(self) -> Optional[Dict]:
cmd = self.get_cmdline() + "-c info"
out = subprocess.getoutput(cmd)
for child in self.info["children"]:
self.children.append(child["id"])
- # override
+ @overrides
def get_cmdline(self, child: Optional[str] = None) -> str:
cmd = (
f"{config.config['smart_outlets_tplink_location']} -m {self.mac} "
cmd += f"-x {child} "
return cmd
- # override
- def command(
- self, cmd: str, child: str = None, extra_args: str = None
- ) -> bool:
+ @overrides
+ def command(self, cmd: str, extra_args: str = None, **kwargs) -> bool:
+ child: Optional[str] = kwargs.get('child', None)
cmd = self.get_cmdline(child) + f"-c {cmd}"
if extra_args is not None:
cmd += f" {extra_args}"
def get_children(self) -> List[str]:
return self.children
+ @overrides
def turn_on(self, child: str = None) -> bool:
return self.command("on", child)
+ @overrides
def turn_off(self, child: str = None) -> bool:
return self.command("off", child)
def parse_google_response(response: GoogleResponse) -> bool:
return response.success
+ @overrides
def turn_on(self) -> bool:
return GoogleOutlet.parse_google_response(
- ask_google('turn {self.goog_name()} on')
+ ask_google(f'turn {self.goog_name()} on')
)
+ @overrides
def turn_off(self) -> bool:
return GoogleOutlet.parse_google_response(
- ask_google('turn {self.goog_name()} off')
+ ask_google(f'turn {self.goog_name()} off')
)
+ @overrides
def is_on(self) -> bool:
r = ask_google(f'is {self.goog_name()} on?')
if not r.success:
return False
- return 'is on' in r.audio_transcription
+ if r.audio_transcription is not None:
+ return 'is on' in r.audio_transcription
+ raise Exception('Can\'t talk to Google right now!?')
+ @overrides
def is_off(self) -> bool:
return not self.is_on()
this class.
"""
+
def __init__(self):
self.loop = asyncio.get_event_loop()
self.email = os.environ.get('MEROSS_EMAIL') or scott_secrets.MEROSS_EMAIL
- self.password = os.environ.get('MEROSS_PASSWORD') or scott_secrets.MEROSS_PASSWORD
+ self.password = (
+ os.environ.get('MEROSS_PASSWORD') or scott_secrets.MEROSS_PASSWORD
+ )
self.devices = self.loop.run_until_complete(self.find_meross_devices())
atexit.register(self.loop.close)
class MerossOutlet(BaseOutlet):
def __init__(self, name: str, mac: str, keywords: str = '') -> None:
super().__init__(name, mac, keywords)
- self.meross_wrapper = None
- self.device = None
+ self.meross_wrapper: Optional[MerossWrapper] = None
+ self.device: Optional[Any] = None
def lazy_initialize_device(self):
"""If we make too many calls to Meross they will block us; only talk
if self.device is None:
raise Exception(f'{self.name} is not a known Meross device?!')
+ @overrides
def turn_on(self) -> bool:
self.lazy_initialize_device()
- self.meross_wrapper.loop.run_until_complete(
- self.device.async_turn_on()
- )
+ assert self.meross_wrapper
+ assert self.device
+ self.meross_wrapper.loop.run_until_complete(self.device.async_turn_on())
return True
+ @overrides
def turn_off(self) -> bool:
self.lazy_initialize_device()
- self.meross_wrapper.loop.run_until_complete(
- self.device.async_turn_off()
- )
+ assert self.meross_wrapper
+ assert self.device
+ self.meross_wrapper.loop.run_until_complete(self.device.async_turn_off())
return True
+ @overrides
def is_on(self) -> bool:
self.lazy_initialize_device()
+ assert self.device
return self.device.is_on()
+ @overrides
def is_off(self) -> bool:
return not self.is_on()
import config
import file_utils
import logical_search
-import smart_home.device as device
import smart_home.cameras as cameras
import smart_home.chromecasts as chromecasts
+import smart_home.device as device
import smart_home.lights as lights
import smart_home.outlets as outlets
args = config.add_commandline_args(
f"Smart Home Registry ({__file__})",
- "Args related to the smart home configuration registry."
+ "Args related to the smart home configuration registry.",
)
args.add_argument(
'--smart_home_registry_file_location',
class SmartHomeRegistry(object):
def __init__(
- self,
- registry_file: Optional[str] = None,
- filters: List[str] = ['smart'],
+ self,
+ registry_file: Optional[str] = None,
+ filters: List[str] = ['smart'],
) -> None:
self._macs_by_name = {}
self._keywords_by_name = {}
# Read the disk config file...
if registry_file is None:
- registry_file = config.config[
- 'smart_home_registry_file_location'
- ]
+ registry_file = config.config['smart_home_registry_file_location']
assert file_utils.does_file_exist(registry_file)
logger.debug(f'Reading {registry_file}')
- with open(registry_file, "r") as f:
- contents = f.readlines()
+ with open(registry_file, "r") as rf:
+ contents = rf.readlines()
# Parse the contents...
for line in contents:
#!/usr/bin/env python3
import re
-from typing import Optional, TypeVar, Tuple
+from typing import Optional, Tuple, TypeVar
import math_utils
-T = TypeVar('T', bound='CentCount')
-
-
class CentCount(object):
"""A class for representing monetary amounts potentially with
different currencies meant to avoid floating point rounding
issues by treating amount as a simple integral count of cents.
"""
- def __init__ (
- self,
- centcount,
- currency: str = 'USD',
- *,
- strict_mode = False
- ):
+ def __init__(self, centcount, currency: str = 'USD', *, strict_mode=False):
self.strict_mode = strict_mode
if isinstance(centcount, str):
ret = CentCount._parse(centcount)
if not currency:
self.currency: Optional[str] = None
else:
- self.currency: Optional[str] = currency
+ self.currency = currency
def __repr__(self):
a = float(self.centcount)
if isinstance(other, CentCount):
if self.currency == other.currency:
return CentCount(
- centcount = self.centcount + other.centcount,
- currency = self.currency
+ centcount=self.centcount + other.centcount, currency=self.currency
)
else:
raise TypeError('Incompatible currencies in add expression')
if isinstance(other, CentCount):
if self.currency == other.currency:
return CentCount(
- centcount = self.centcount - other.centcount,
- currency = self.currency
+ centcount=self.centcount - other.centcount, currency=self.currency
)
else:
raise TypeError('Incompatible currencies in add expression')
raise TypeError('can not multiply monetary quantities')
else:
return CentCount(
- centcount = int(self.centcount * float(other)),
- currency = self.currency
+ centcount=int(self.centcount * float(other)), currency=self.currency
)
def __truediv__(self, other):
raise TypeError('can not divide monetary quantities')
else:
return CentCount(
- centcount = int(float(self.centcount) / float(other)),
- currency = self.currency
+ centcount=int(float(self.centcount) / float(other)),
+ currency=self.currency,
)
def __int__(self):
if isinstance(other, CentCount):
if self.currency == other.currency:
return CentCount(
- centcount = other.centcount - self.centcount,
- currency = self.currency
+ centcount=other.centcount - self.centcount, currency=self.currency
)
else:
raise TypeError('Incompatible currencies in sub expression')
raise TypeError('In strict_mode only two moneys can be added')
else:
return CentCount(
- centcount = int(other) - self.centcount,
- currency = self.currency
+ centcount=int(other) - self.centcount, currency=self.currency
)
__rmul__ = __mul__
if other is None:
return False
if isinstance(other, CentCount):
- return (
- self.centcount == other.centcount and
- self.currency == other.currency
- )
+ return self.centcount == other.centcount and self.currency == other.currency
if self.strict_mode:
raise TypeError("In strict mode only two CentCounts can be compared")
else:
def __hash__(self):
return self.__repr__
- CENTCOUNT_RE = re.compile("^([+|-]?)(\d+)(\.\d+)$")
- CURRENCY_RE = re.compile("^[A-Z][A-Z][A-Z]$")
+ CENTCOUNT_RE = re.compile(r"^([+|-]?)(\d+)(\.\d+)$")
+ CURRENCY_RE = re.compile(r"^[A-Z][A-Z][A-Z]$")
@classmethod
def _parse(cls, s: str) -> Optional[Tuple[int, str]]:
return None
@classmethod
- def parse(cls, s: str) -> T:
+ def parse(cls, s: str) -> 'CentCount':
chunks = CentCount._parse(s)
if chunks is not None:
return CentCount(chunks[0], chunks[1])
#!/usr/bin/env python3
-from decimal import Decimal
import re
-from typing import Optional, TypeVar, Tuple
+from decimal import Decimal
+from typing import Optional, Tuple, TypeVar
import math_utils
-T = TypeVar('T', bound='Money')
-
-
class Money(object):
"""A class for representing monetary amounts potentially with
different currencies.
"""
- def __init__ (
- self,
- amount: Decimal = Decimal("0"),
- currency: str = 'USD',
- *,
- strict_mode = False
+ def __init__(
+ self,
+ amount: Decimal = Decimal("0"),
+ currency: str = 'USD',
+ *,
+ strict_mode=False,
):
self.strict_mode = strict_mode
if isinstance(amount, str):
if not currency:
self.currency: Optional[str] = None
else:
- self.currency: Optional[str] = currency
+ self.currency = currency
def __repr__(self):
a = float(self.amount)
def __add__(self, other):
if isinstance(other, Money):
if self.currency == other.currency:
- return Money(
- amount = self.amount + other.amount,
- currency = self.currency
- )
+ return Money(amount=self.amount + other.amount, currency=self.currency)
else:
raise TypeError('Incompatible currencies in add expression')
else:
raise TypeError('In strict_mode only two moneys can be added')
else:
return Money(
- amount = self.amount + Decimal(float(other)),
- currency = self.currency
+ amount=self.amount + Decimal(float(other)), currency=self.currency
)
def __sub__(self, other):
if isinstance(other, Money):
if self.currency == other.currency:
- return Money(
- amount = self.amount - other.amount,
- currency = self.currency
- )
+ return Money(amount=self.amount - other.amount, currency=self.currency)
else:
raise TypeError('Incompatible currencies in add expression')
else:
raise TypeError('In strict_mode only two moneys can be added')
else:
return Money(
- amount = self.amount - Decimal(float(other)),
- currency = self.currency
+ amount=self.amount - Decimal(float(other)), currency=self.currency
)
def __mul__(self, other):
raise TypeError('can not multiply monetary quantities')
else:
return Money(
- amount = self.amount * Decimal(float(other)),
- currency = self.currency
+ amount=self.amount * Decimal(float(other)), currency=self.currency
)
def __truediv__(self, other):
raise TypeError('can not divide monetary quantities')
else:
return Money(
- amount = self.amount / Decimal(float(other)),
- currency = self.currency
+ amount=self.amount / Decimal(float(other)), currency=self.currency
)
def __float__(self):
def __rsub__(self, other):
if isinstance(other, Money):
if self.currency == other.currency:
- return Money(
- amount = other.amount - self.amount,
- currency = self.currency
- )
+ return Money(amount=other.amount - self.amount, currency=self.currency)
else:
raise TypeError('Incompatible currencies in sub expression')
else:
raise TypeError('In strict_mode only two moneys can be added')
else:
return Money(
- amount = Decimal(float(other)) - self.amount,
- currency = self.currency
+ amount=Decimal(float(other)) - self.amount, currency=self.currency
)
__rmul__ = __mul__
if other is None:
return False
if isinstance(other, Money):
- return (
- self.amount == other.amount and
- self.currency == other.currency
- )
+ return self.amount == other.amount and self.currency == other.currency
if self.strict_mode:
raise TypeError("In strict mode only two Moneys can be compared")
else:
def __hash__(self):
return self.__repr__
- AMOUNT_RE = re.compile("^([+|-]?)(\d+)(\.\d+)$")
- CURRENCY_RE = re.compile("^[A-Z][A-Z][A-Z]$")
+ AMOUNT_RE = re.compile(r"^([+|-]?)(\d+)(\.\d+)$")
+ CURRENCY_RE = re.compile(r"^[A-Z][A-Z][A-Z]$")
@classmethod
def _parse(cls, s: str) -> Optional[Tuple[Decimal, str]]:
return None
@classmethod
- def parse(cls, s: str) -> T:
+ def parse(cls, s: str) -> 'Money':
chunks = Money._parse(s)
if chunks is not None:
return Money(chunks[0], chunks[1])
class Rate(object):
def __init__(
- self,
- multiplier: Optional[float] = None,
- *,
- percentage: Optional[float] = None,
- percent_change: Optional[float] = None,
+ self,
+ multiplier: Optional[float] = None,
+ *,
+ percentage: Optional[float] = None,
+ percent_change: Optional[float] = None,
):
count = 0
if multiplier is not None:
multiplier = multiplier.replace('%', '')
m = float(multiplier)
m /= 100
- self.multiplier = m
+ self.multiplier: float = m
else:
self.multiplier = multiplier
count += 1
def __hash__(self):
return self.multiplier
- def __repr__(self,
- *,
- relative=False,
- places=3):
+ def __repr__(self, *, relative=False, places=3):
if relative:
percentage = (self.multiplier - 1.0) * 100.0
else: