):
self.weather_data[today].high = high
continue
- most_common_condition = list_utils.most_common_item(conditions[dt])
+ most_common_condition = list_utils.most_common(conditions[dt])
icon = icon_by_condition.get(most_common_condition, '?')
if dt == now.date() and now.hour > 18 and condition == 'Clear':
icon = '🌙'
def now_pacific() -> datetime.datetime:
"""
- What time is it? Result in US/Pacifit time (PST/PDT)
+ What time is it? Result in US/Pacific time (PST/PDT)
"""
return replace_timezone(now(), pytz.timezone("US/Pacific"))
#!/usr/bin/env python3
from itertools import islice
-from typing import Any, Callable, Dict, Iterator, Tuple
+from typing import Any, Callable, Dict, Iterator, List, Tuple
def init_or_inc(
yield {key: value for (key, value) in islice(items, x, x + size)}
-def coalesce_by_creating_list(key, v1, v2):
+def coalesce_by_creating_list(key, new_value, old_value):
from list_utils import flatten
- return flatten([v1, v2])
+ return flatten([new_value, old_value])
-def coalesce_by_creating_set(key, v1, v2):
- return set(coalesce_by_creating_list(key, v1, v2))
+def coalesce_by_creating_set(key, new_value, old_value):
+ return set(coalesce_by_creating_list(key, new_value, old_value))
-def raise_on_duplicated_keys(key, v1, v2):
+def coalesce_last_write_wins(key, new_value, old_value):
+ return new_value
+
+
+def coalesce_first_write_wins(key, new_value, old_value):
+ return old_value
+
+
+def raise_on_duplicated_keys(key, new_value, old_value):
raise Exception(f'Key {key} is duplicated in more than one input dict.')
*,
aggregation_function: Callable[[Any, Any], Any] = coalesce_by_creating_list
) -> Dict[Any, Any]:
- """Merge N dicts into one dict containing the union of all keys/values in
- the input dicts. When keys collide, apply the aggregation_function which,
- by default, creates a list of values. See also coalesce_by_creating_set or
- provide a user defined aggregation_function.
+ """Merge N dicts into one dict containing the union of all keys /
+ values in the input dicts. When keys collide, apply the
+ aggregation_function which, by default, creates a list of values.
+ See also several other alternative functions for coalescing values
+ (coalesce_by_creating_set, coalesce_first_write_wins,
+ coalesce_last_write_wins, raise_on_duplicated_keys) or provide a
+ custom helper function.
>>> a = {'a': 1, 'b': 2}
>>> b = {'b': 1, 'c': 2, 'd': 3}
>>> coalesce([a, b, c])
{'a': 1, 'b': [1, 2], 'c': [1, 2], 'd': [2, 3]}
+ >>> coalesce([a, b, c], aggregation_function=coalesce_last_write_wins)
+ {'a': 1, 'b': 1, 'c': 1, 'd': 2}
+
+ >>> coalesce([a, b, c], aggregation_function=raise_on_duplicated_keys)
+ Traceback (most recent call last):
+ ...
+ Exception: Key b is duplicated in more than one input dict.
+
"""
out: Dict[Any, Any] = {}
for d in inputs:
return min(d.keys())
+def parallel_lists_to_dict(keys: List[Any], values: List[Any]) -> Dict[Any, Any]:
+ """Given two parallel lists (keys and values), create and return
+ a dict.
+
+ >>> k = ['name', 'phone', 'address', 'zip']
+ >>> v = ['scott', '555-1212', '123 main st.', '12345']
+ >>> parallel_lists_to_dict(k, v)
+ {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
+
+ """
+ if len(keys) != len(values):
+ raise Exception("Parallel keys and values lists must have the same length")
+ return dict(zip(keys, values))
+
+
+def dict_to_key_value_lists(d: Dict[Any, Any]) -> Tuple[List[Any], List[Any]]:
+ """
+ >>> d = {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
+ >>> (k, v) = dict_to_key_value_lists(d)
+ >>> k
+ ['name', 'phone', 'address', 'zip']
+ >>> v
+ ['scott', '555-1212', '123 main st.', '12345']
+
+ """
+ r = ([], [])
+ for (k, v) in d.items():
+ r[0].append(k)
+ r[1].append(v)
+ return r
+
+
if __name__ == '__main__':
import doctest
doctest.testmod()
policy: RemoteWorkerSelectionPolicy) -> None:
super().__init__()
self.workers = workers
+ self.policy = policy
self.worker_count = 0
for worker in self.workers:
self.worker_count += worker.count
msg = f"We need somewhere to schedule work; count was {self.worker_count}"
logger.critical(msg)
raise Exception(msg)
- self.policy = policy
self.policy.register_worker_pool(self.workers)
self.cv = threading.Condition()
self._helper_executor = fut.ThreadPoolExecutor(
self.status = RemoteExecutorStatus(self.worker_count)
self.total_bundles_submitted = 0
logger.debug(
- f'Creating remote processpool with {self.worker_count} remote endpoints.'
+ f'Creating remote processpool with {self.worker_count} remote worker threads.'
)
def is_worker_available(self) -> bool:
import re
import subprocess
import sys
-from typing import Dict, List, Optional, Set
+from typing import Any, Dict, List, Optional, Set
+
+import tinytuya as tt
import argparse_utils
import config
)
+class TuyaLight(Light):
+ ids_by_mac = {
+ '68:C6:3A:DE:1A:94': '8844664268c63ade1a94',
+ '68:C6:3A:DE:27:1A': '8844664268c63ade271a',
+ '68:C6:3A:DE:1D:95': '8844664268c63ade1d95',
+ '68:C6:3A:DE:19:B3': '8844664268c63ade19b3',
+ '80:7D:3A:77:3B:F5': '07445340807d3a773bf5',
+ '80:7D:3A:58:37:02': '07445340807d3a583702',
+ }
+ keys_by_mac = {
+ '68:C6:3A:DE:1A:94': '237f19b1b3d49c36',
+ '68:C6:3A:DE:27:1A': '237f19b1b3d49c36',
+ '68:C6:3A:DE:1D:95': '04b90fc5cd7625d8',
+ '68:C6:3A:DE:19:B3': '2d601f2892f1aefd',
+ '80:7D:3A:77:3B:F5': '27ab921fe4633519',
+ '80:7D:3A:58:37:02': '8559b5416bfa0c05',
+ }
+
+ def __init__(self, name: str, mac: str, keywords: str = "") -> None:
+ from subprocess import Popen, PIPE
+ super().__init__(name, mac, keywords)
+ mac = mac.upper()
+ if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac:
+ raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac')
+ self.devid = TuyaLight.ids_by_mac[mac]
+ self.key = TuyaLight.keys_by_mac[mac]
+ try:
+ pid = Popen(['maclookup', mac], stdout=PIPE)
+ ip = pid.communicate()[0]
+ ip = ip[:-1]
+ except Exception:
+ ip = '0.0.0.0'
+ self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key)
+
+ def turn_on(self) -> bool:
+ self.bulb.turn_on()
+ return True
+
+ def turn_off(self) -> bool:
+ self.bulb.turn_off()
+ return True
+
+ def get_status(self) -> Dict[str, Any]:
+ return self.bulb.status()
+
+ def is_on(self) -> bool:
+ s = self.get_status()
+ return s['dps']['1']
+
+ def is_off(self) -> bool:
+ return not self.is_on()
+
+ def get_dimmer_level(self) -> Optional[int]:
+ s = self.get_status()
+ return s['dps']['3']
+
+ def set_dimmer_level(self, level: int) -> bool:
+ self.bulb.set_brightness(level)
+ return True
+
+ def make_color(self, color: str) -> bool:
+ self.bulb.set_colour(255,0,0)
+ return True
+
+
class TPLinkLight(Light):
def __init__(self, name: str, mac: str, keywords: str = "") -> None:
super().__init__(name, mac, keywords)
)
-def group_google_lights(lights: List[Light]) -> List[Light]:
- bookcase_group = []
- diningroom_group = []
- for light in lights:
- name = light.get_name()
- if "bookcase_light_" in name:
- bookcase_group.append(light)
- elif "diningroom_light_" in name:
- diningroom_group.append(light)
-
- did_bookcase = False
- did_diningroom = False
- ret = []
- for light in lights:
- name = light.get_name()
- if "bookcase_light_" in name:
- if len(bookcase_group) == 4 and not did_bookcase:
- ret.append(
- GoogleLightGroup(
- "bookcase_lights",
- bookcase_group,
- "perm wifi light smart goog dimmer"
- )
- )
- did_bookcase = True
- elif "diningroom_light_" in name:
- if len(diningroom_group) == 2 and not did_diningroom:
- ret.append(
- GoogleLightGroup(
- "dining_room_lights",
- diningroom_group,
- "intermittent wifi light smart goog dimmer"
- )
- )
- did_diningroom = True
- else:
- ret.append(light)
- return ret
-
-
class LightingConfig(object):
"""Representation of the smart light device config."""
self.keywords_by_mac[mac] = keywords
self.names_by_mac[mac] = name
- if "bookcase_light_" in name:
- bookcase_lights.append(mac)
- elif "diningroom_light_" in name:
- diningroom_lights.append(mac)
- else:
- self.index_light(name, keywords, mac)
-
- name = 'bookcase_lights'
- group = []
- keywords = 'perm wifi light smart goog dimmer'
- for b in bookcase_lights:
- group.append(self.get_light_by_mac(b))
- self.bookcase_group = GoogleLightGroup(
- name,
- group,
- keywords,
- )
- mac = self.bookcase_group.get_mac()
- self.macs_by_name[name] = mac
- self._keywords_by_name[name] = keywords
- self.keywords_by_mac[mac] = keywords
- self.names_by_mac[mac] = name
- self.index_light(name, keywords, mac)
-
- name = 'dining_room_lights'
- group = []
- for b in diningroom_lights:
- group.append(self.get_light_by_mac(b))
- self.diningroom_group = GoogleLightGroup(
- name,
- group,
- keywords,
- )
- mac = self.diningroom_group.get_mac()
- self.macs_by_name[name] = mac
- self._keywords_by_name[name] = keywords
- self.keywords_by_mac[mac] = keywords
- self.names_by_mac[mac] = name
- self.index_light(name, keywords, mac)
+# if "bookcase_light_" in name:
+# bookcase_lights.append(mac)
+# elif "diningroom_light_" in name:
+# diningroom_lights.append(mac)
+# else:
+ self.index_light(name, keywords, mac)
+
+ # name = 'bookcase_lights'
+ # group = []
+ # keywords = 'perm wifi light smart goog dimmer'
+ # for b in bookcase_lights:
+ # group.append(self.get_light_by_mac(b))
+ # self.bookcase_group = GoogleLightGroup(
+ # name,
+ # group,
+ # keywords,
+ # )
+ # mac = self.bookcase_group.get_mac()
+ # self.macs_by_name[name] = mac
+ # self._keywords_by_name[name] = keywords
+ # self.keywords_by_mac[mac] = keywords
+ # self.names_by_mac[mac] = name
+ # self.index_light(name, keywords, mac)
+
+ # name = 'dining_room_lights'
+ # group = []
+ # for b in diningroom_lights:
+ # group.append(self.get_light_by_mac(b))
+ # self.diningroom_group = GoogleLightGroup(
+ # name,
+ # group,
+ # keywords,
+ # )
+ # mac = self.diningroom_group.get_mac()
+ # self.macs_by_name[name] = mac
+ # self._keywords_by_name[name] = keywords
+ # self.keywords_by_mac[mac] = keywords
+ # self.names_by_mac[mac] = name
+ # self.index_light(name, keywords, mac)
def index_light(self, name: str, keywords: str, mac: str) -> None:
properties = [("name", name)]
properties.append((key, value))
else:
tags.add(kw)
- self.corpus.add_doc(
- logical_search.Document(
- docid=mac,
- tags=tags,
- properties=properties,
- reference=None,
- )
+ light = logical_search.Document(
+ docid=mac,
+ tags=tags,
+ properties=properties,
+ reference=None,
)
+ self.corpus.add_doc(light)
def __repr__(self) -> str:
s = "Known devices:\n"
light = self.get_light_by_mac(mac)
if light is not None:
retval.append(light)
- return group_google_lights(retval)
+ return retval
def get_light_by_mac(self, mac: str) -> Optional[Light]:
if mac in self.keywords_by_mac:
return self.bookcase_group
elif name == 'dining_room_lights':
return self.diningroom_group
- elif "tplink" in kws.lower():
+ elif 'tplink' in kws.lower():
return TPLinkLight(name, mac, kws)
+ elif 'tuya' in kws.lower():
+ return TuyaLight(name, mac, kws)
else:
return GoogleLight(name, mac, kws)
return None
light = self.get_light_by_mac(mac)
if light is not None:
retval.append(light)
- return group_google_lights(retval)
+ return retval
return lst
+def remove_list_if_one_element(lst: List[Any]) -> Any:
+ """
+ Remove the list and return the 0th element iff its length is one.
+
+ >>> remove_list_if_one_element([1234])
+ 1234
+
+ >>> remove_list_if_one_element([1, 2, 3, 4])
+ [1, 2, 3, 4]
+
+ """
+ if len(lst) == 1:
+ return lst[0]
+ else:
+ return lst
+
+
def population_counts(lst: List[Any]) -> Mapping[Any, int]:
"""
Return a population count mapping for the list (i.e. the keys are
return Counter(lst)
-def most_common_item(lst: List[Any]) -> Any:
+def most_common(lst: List[Any], *, count=1) -> Any:
"""
Return the most common item in the list. In the case of ties,
which most common item is returned will be random.
- >>> most_common_item([1, 1, 1, 2, 2, 3, 3, 3, 3, 4, 4])
+ >>> most_common([1, 1, 1, 2, 2, 3, 3, 3, 3, 4, 4])
3
+ >>> most_common([1, 1, 1, 2, 2, 3, 3, 3, 3, 4, 4], count=2)
+ [3, 1]
+
"""
- return population_counts(lst).most_common(1)[0][0]
+ p = population_counts(lst)
+ return remove_list_if_one_element([_[0] for _ in p.most_common()[0:count]])
-def least_common_item(lst: List[Any]) -> Any:
+def least_common(lst: List[Any], *, count=1) -> Any:
"""
Return the least common item in the list. In the case of
ties, which least common item is returned will be random.
- >>> least_common_item([1, 1, 1, 2, 2, 3, 3, 3, 4])
+ >>> least_common([1, 1, 1, 2, 2, 3, 3, 3, 4])
4
+ >>> least_common([1, 1, 1, 2, 2, 3, 3, 3, 4], count=2)
+ [4, 2]
+
"""
- return population_counts(lst).most_common()[-1][0]
+ p = population_counts(lst)
+ mc = p.most_common()[-count:]
+ mc.reverse()
+ return remove_list_if_one_element([_[0] for _ in mc])
def dedup_list(lst: List[Any]) -> List[Any]:
return dedup_list(lst)
+def contains_duplicates(lst: List[Any]) -> bool:
+ """
+ Does the list contian duplicate elements or not?
+
+ >>> lst = [1, 2, 1, 3, 3, 4, 4, 5, 6, 1, 3, 4]
+ >>> contains_duplicates(lst)
+ True
+
+ >>> contains_duplicates(dedup_list(lst))
+ False
+
+ """
+ seen = set()
+ for _ in lst:
+ if _ in seen:
+ return True
+ seen.add(_)
+ return False
+
+
+def all_unique(lst: List[Any]) -> bool:
+ """
+ Inverted alias for contains_duplicates.
+ """
+ return not contains_duplicates(lst)
+
+
+def transpose(lst: List[Any]) -> List[Any]:
+ """
+ Transpose a list of lists.
+
+ >>> lst = [[1, 2], [3, 4], [5, 6]]
+ >>> transpose(lst)
+ [[1, 3, 5], [2, 4, 6]]
+
+ """
+ transposed = zip(*lst)
+ return [list(_) for _ in transposed]
+
+
def ngrams(lst: Sequence[Any], n):
"""
Return the ngrams in the sequence.
--- /dev/null
+#!/usr/bin/env python3
+
+from dataclasses import dataclass
+import logging
+import platform
+from typing import Optional
+
+import config
+
+logger = logging.getLogger(__name__)
+args = config.add_commandline_args(
+ f'({__file__})',
+ 'Args related to __file__'
+)
+
+args.add_argument(
+ '--site_config_location',
+ default='AUTO',
+ const='AUTO',
+ nargs='?',
+ choices=('HOUSE', 'CABIN', 'AUTO'),
+ help='Where are we, HOUSE or CABIN?'
+)
+
+
+@dataclass
+class SiteConfig(object):
+ network: str
+ network_netmask: str
+ network_router_ip: str
+
+
+def get_location():
+ location = config.config['site_config_location']
+ if location == 'AUTO':
+ hostname = platform.node()
+ if '.house' in hostname:
+ location = 'HOUSE'
+ elif '.cabin' in hostname:
+ location = 'CABIN'
+ else:
+ raise Exception(f'Unknown hostname {hostname}, help.')
+ return location
+
+
+def get_config():
+ location = get_location()
+ if location == 'HOUSE':
+ return SiteConfig(
+ network = '10.0.0.0/24',
+ network_netmask = '255.255.255.0',
+ network_router_ip = '10.0.0.1',
+ )
+ elif location == 'CABIN':
+ return SiteConfig(
+ network = '192.168.0.0/24',
+ network_netmask = '255.255.255.0',
+ network_router_ip = '192.168.0.1',
+ )
+ else:
+ raise Exception('Unknown site location')
>>> extract_mac_address(' MAC Address: 34:29:8F:12:0D:2F')
'34:29:8F:12:0D:2F'
+ >>> extract_mac_address('? (10.0.0.30) at d8:5d:e2:34:54:86 on em0 expires in 1176 seconds [ethernet]')
+ 'd8:5d:e2:34:54:86'
+
"""
if not is_full_string(in_str):
return None
return n.to_bytes((n.bit_length() + 7) // 8, 'big').decode(encoding, errors) or '\0'
+def ip_v4_sort_key(txt: str) -> str:
+ """Turn an IPv4 address into a tuple for sorting purposes.
+
+ >>> ip_v4_sort_key('10.0.0.18')
+ (10, 0, 0, 18)
+
+ """
+ if not is_ip_v4(txt):
+ print(f"not IP: {txt}")
+ return None
+ return tuple([int(x) for x in txt.split('.')])
+
+
if __name__ == '__main__':
import doctest
doctest.testmod()