Adds site_config; adds Tuya lights. Bugfixes.
authorScott Gasch <[email protected]>
Sun, 24 Oct 2021 06:49:08 +0000 (23:49 -0700)
committerScott Gasch <[email protected]>
Sun, 24 Oct 2021 06:49:08 +0000 (23:49 -0700)
cached/weather_data.py
datetime_utils.py
dict_utils.py
executors.py
light_utils.py
list_utils.py
site_config.py [new file with mode: 0644]
string_utils.py

index 8d49736bae0cf264fc7447c150011f616ccec388..89c330aba5e17f20fd537f1d60b490bfa43e25c2 100644 (file)
@@ -164,7 +164,7 @@ class CachedWeatherData(persistent.Persistent):
                 ):
                     self.weather_data[today].high = high
                 continue
-            most_common_condition = list_utils.most_common_item(conditions[dt])
+            most_common_condition = list_utils.most_common(conditions[dt])
             icon = icon_by_condition.get(most_common_condition, '?')
             if dt == now.date() and now.hour > 18 and condition == 'Clear':
                 icon = '🌙'
index c100057d3a306a0efb482418479a3857dcc07b93..e31edc29428ad57a0d02282e9ee4ccdc8c55c546 100644 (file)
@@ -42,7 +42,7 @@ def now() -> datetime.datetime:
 
 def now_pacific() -> datetime.datetime:
     """
-    What time is it?  Result in US/Pacifit time (PST/PDT)
+    What time is it?  Result in US/Pacific time (PST/PDT)
     """
     return replace_timezone(now(), pytz.timezone("US/Pacific"))
 
index 92fd1e06a21d06a3205a138639f9cf013cb055ec..7b0edb50a87a5854f20326408cfc0a03d22408dc 100644 (file)
@@ -1,7 +1,7 @@
 #!/usr/bin/env python3
 
 from itertools import islice
-from typing import Any, Callable, Dict, Iterator, Tuple
+from typing import Any, Callable, Dict, Iterator, List, Tuple
 
 
 def init_or_inc(
@@ -44,16 +44,24 @@ def shard(d: Dict[Any, Any], size: int) -> Iterator[Dict[Any, Any]]:
         yield {key: value for (key, value) in islice(items, x, x + size)}
 
 
-def coalesce_by_creating_list(key, v1, v2):
+def coalesce_by_creating_list(key, new_value, old_value):
     from list_utils import flatten
-    return flatten([v1, v2])
+    return flatten([new_value, old_value])
 
 
-def coalesce_by_creating_set(key, v1, v2):
-    return set(coalesce_by_creating_list(key, v1, v2))
+def coalesce_by_creating_set(key, new_value, old_value):
+    return set(coalesce_by_creating_list(key, new_value, old_value))
 
 
-def raise_on_duplicated_keys(key, v1, v2):
+def coalesce_last_write_wins(key, new_value, old_value):
+    return new_value
+
+
+def coalesce_first_write_wins(key, new_value, old_value):
+    return old_value
+
+
+def raise_on_duplicated_keys(key, new_value, old_value):
     raise Exception(f'Key {key} is duplicated in more than one input dict.')
 
 
@@ -62,10 +70,13 @@ def coalesce(
         *,
         aggregation_function: Callable[[Any, Any], Any] = coalesce_by_creating_list
 ) -> Dict[Any, Any]:
-    """Merge N dicts into one dict containing the union of all keys/values in
-    the input dicts.  When keys collide, apply the aggregation_function which,
-    by default, creates a list of values.  See also coalesce_by_creating_set or
-    provide a user defined aggregation_function.
+    """Merge N dicts into one dict containing the union of all keys /
+    values in the input dicts.  When keys collide, apply the
+    aggregation_function which, by default, creates a list of values.
+    See also several other alternative functions for coalescing values
+    (coalesce_by_creating_set, coalesce_first_write_wins,
+    coalesce_last_write_wins, raise_on_duplicated_keys) or provide a
+    custom helper function.
 
     >>> a = {'a': 1, 'b': 2}
     >>> b = {'b': 1, 'c': 2, 'd': 3}
@@ -73,6 +84,14 @@ def coalesce(
     >>> coalesce([a, b, c])
     {'a': 1, 'b': [1, 2], 'c': [1, 2], 'd': [2, 3]}
 
+    >>> coalesce([a, b, c], aggregation_function=coalesce_last_write_wins)
+    {'a': 1, 'b': 1, 'c': 1, 'd': 2}
+
+    >>> coalesce([a, b, c], aggregation_function=raise_on_duplicated_keys)
+    Traceback (most recent call last):
+    ...
+    Exception: Key b is duplicated in more than one input dict.
+
     """
     out: Dict[Any, Any] = {}
     for d in inputs:
@@ -177,6 +196,38 @@ def min_key(d: Dict[Any, Any]) -> Any:
     return min(d.keys())
 
 
+def parallel_lists_to_dict(keys: List[Any], values: List[Any]) -> Dict[Any, Any]:
+    """Given two parallel lists (keys and values), create and return
+    a dict.
+
+    >>> k = ['name', 'phone', 'address', 'zip']
+    >>> v = ['scott', '555-1212', '123 main st.', '12345']
+    >>> parallel_lists_to_dict(k, v)
+    {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
+
+    """
+    if len(keys) != len(values):
+        raise Exception("Parallel keys and values lists must have the same length")
+    return dict(zip(keys, values))
+
+
+def dict_to_key_value_lists(d: Dict[Any, Any]) -> Tuple[List[Any], List[Any]]:
+    """
+    >>> d = {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
+    >>> (k, v) = dict_to_key_value_lists(d)
+    >>> k
+    ['name', 'phone', 'address', 'zip']
+    >>> v
+    ['scott', '555-1212', '123 main st.', '12345']
+
+    """
+    r = ([], [])
+    for (k, v) in d.items():
+        r[0].append(k)
+        r[1].append(v)
+    return r
+
+
 if __name__ == '__main__':
     import doctest
     doctest.testmod()
index 0b4d80ed1b8b0b387b4b6a7363c13c268e265a5c..3cb0a916c080128e63a23600db76c07e93956ec9 100644 (file)
@@ -476,6 +476,7 @@ class RemoteExecutor(BaseExecutor):
                  policy: RemoteWorkerSelectionPolicy) -> None:
         super().__init__()
         self.workers = workers
+        self.policy = policy
         self.worker_count = 0
         for worker in self.workers:
             self.worker_count += worker.count
@@ -483,7 +484,6 @@ class RemoteExecutor(BaseExecutor):
             msg = f"We need somewhere to schedule work; count was {self.worker_count}"
             logger.critical(msg)
             raise Exception(msg)
-        self.policy = policy
         self.policy.register_worker_pool(self.workers)
         self.cv = threading.Condition()
         self._helper_executor = fut.ThreadPoolExecutor(
@@ -493,7 +493,7 @@ class RemoteExecutor(BaseExecutor):
         self.status = RemoteExecutorStatus(self.worker_count)
         self.total_bundles_submitted = 0
         logger.debug(
-            f'Creating remote processpool with {self.worker_count} remote endpoints.'
+            f'Creating remote processpool with {self.worker_count} remote worker threads.'
         )
 
     def is_worker_available(self) -> bool:
index 8101a32523c486d0bc2ff782be8435297c18bcd9..6ca6e71db2329a65402116a5cf769048023521f6 100644 (file)
@@ -10,7 +10,9 @@ import os
 import re
 import subprocess
 import sys
-from typing import Dict, List, Optional, Set
+from typing import Any, Dict, List, Optional, Set
+
+import tinytuya as tt
 
 import argparse_utils
 import config
@@ -178,6 +180,71 @@ class GoogleLight(Light):
         )
 
 
+class TuyaLight(Light):
+    ids_by_mac = {
+        '68:C6:3A:DE:1A:94': '8844664268c63ade1a94',
+        '68:C6:3A:DE:27:1A': '8844664268c63ade271a',
+        '68:C6:3A:DE:1D:95': '8844664268c63ade1d95',
+        '68:C6:3A:DE:19:B3': '8844664268c63ade19b3',
+        '80:7D:3A:77:3B:F5': '07445340807d3a773bf5',
+        '80:7D:3A:58:37:02': '07445340807d3a583702',
+    }
+    keys_by_mac = {
+        '68:C6:3A:DE:1A:94': '237f19b1b3d49c36',
+        '68:C6:3A:DE:27:1A': '237f19b1b3d49c36',
+        '68:C6:3A:DE:1D:95': '04b90fc5cd7625d8',
+        '68:C6:3A:DE:19:B3': '2d601f2892f1aefd',
+        '80:7D:3A:77:3B:F5': '27ab921fe4633519',
+        '80:7D:3A:58:37:02': '8559b5416bfa0c05',
+    }
+
+    def __init__(self, name: str, mac: str, keywords: str = "") -> None:
+        from subprocess import Popen, PIPE
+        super().__init__(name, mac, keywords)
+        mac = mac.upper()
+        if mac not in TuyaLight.ids_by_mac or mac not in TuyaLight.keys_by_mac:
+            raise Exception(f'{mac} is unknown; add it to ids_by_mac and keys_by_mac')
+        self.devid = TuyaLight.ids_by_mac[mac]
+        self.key = TuyaLight.keys_by_mac[mac]
+        try:
+            pid = Popen(['maclookup', mac], stdout=PIPE)
+            ip = pid.communicate()[0]
+            ip = ip[:-1]
+        except Exception:
+            ip = '0.0.0.0'
+        self.bulb = tt.BulbDevice(self.devid, ip, local_key=self.key)
+
+    def turn_on(self) -> bool:
+        self.bulb.turn_on()
+        return True
+
+    def turn_off(self) -> bool:
+        self.bulb.turn_off()
+        return True
+
+    def get_status(self) -> Dict[str, Any]:
+        return self.bulb.status()
+
+    def is_on(self) -> bool:
+        s = self.get_status()
+        return s['dps']['1']
+
+    def is_off(self) -> bool:
+        return not self.is_on()
+
+    def get_dimmer_level(self) -> Optional[int]:
+        s = self.get_status()
+        return s['dps']['3']
+
+    def set_dimmer_level(self, level: int) -> bool:
+        self.bulb.set_brightness(level)
+        return True
+
+    def make_color(self, color: str) -> bool:
+        self.bulb.set_colour(255,0,0)
+        return True
+
+
 class TPLinkLight(Light):
     def __init__(self, name: str, mac: str, keywords: str = "") -> None:
         super().__init__(name, mac, keywords)
@@ -350,46 +417,6 @@ class GoogleLightGroup(GoogleLight):
         )
 
 
-def group_google_lights(lights: List[Light]) -> List[Light]:
-    bookcase_group = []
-    diningroom_group = []
-    for light in lights:
-        name = light.get_name()
-        if "bookcase_light_" in name:
-            bookcase_group.append(light)
-        elif "diningroom_light_" in name:
-            diningroom_group.append(light)
-
-    did_bookcase = False
-    did_diningroom = False
-    ret = []
-    for light in lights:
-        name = light.get_name()
-        if "bookcase_light_" in name:
-            if len(bookcase_group) == 4 and not did_bookcase:
-                ret.append(
-                    GoogleLightGroup(
-                        "bookcase_lights",
-                        bookcase_group,
-                        "perm wifi light smart goog dimmer"
-                    )
-                )
-                did_bookcase = True
-        elif "diningroom_light_" in name:
-            if len(diningroom_group) == 2 and not did_diningroom:
-                ret.append(
-                    GoogleLightGroup(
-                        "dining_room_lights",
-                        diningroom_group,
-                        "intermittent wifi light smart goog dimmer"
-                    )
-                )
-                did_diningroom = True
-        else:
-            ret.append(light)
-    return ret
-
-
 class LightingConfig(object):
     """Representation of the smart light device config."""
 
@@ -428,45 +455,45 @@ class LightingConfig(object):
             self.keywords_by_mac[mac] = keywords
             self.names_by_mac[mac] = name
 
-            if "bookcase_light_" in name:
-                bookcase_lights.append(mac)
-            elif "diningroom_light_" in name:
-                diningroom_lights.append(mac)
-            else:
-                self.index_light(name, keywords, mac)
-
-        name = 'bookcase_lights'
-        group = []
-        keywords = 'perm wifi light smart goog dimmer'
-        for b in bookcase_lights:
-            group.append(self.get_light_by_mac(b))
-        self.bookcase_group = GoogleLightGroup(
-            name,
-            group,
-            keywords,
-        )
-        mac = self.bookcase_group.get_mac()
-        self.macs_by_name[name] = mac
-        self._keywords_by_name[name] = keywords
-        self.keywords_by_mac[mac] = keywords
-        self.names_by_mac[mac] = name
-        self.index_light(name, keywords, mac)
-
-        name = 'dining_room_lights'
-        group = []
-        for b in diningroom_lights:
-            group.append(self.get_light_by_mac(b))
-        self.diningroom_group = GoogleLightGroup(
-            name,
-            group,
-            keywords,
-        )
-        mac = self.diningroom_group.get_mac()
-        self.macs_by_name[name] = mac
-        self._keywords_by_name[name] = keywords
-        self.keywords_by_mac[mac] = keywords
-        self.names_by_mac[mac] = name
-        self.index_light(name, keywords, mac)
+#            if "bookcase_light_" in name:
+#                bookcase_lights.append(mac)
+#            elif "diningroom_light_" in name:
+#                diningroom_lights.append(mac)
+#            else:
+            self.index_light(name, keywords, mac)
+
+        name = 'bookcase_lights'
+        group = []
+        keywords = 'perm wifi light smart goog dimmer'
+        for b in bookcase_lights:
+            group.append(self.get_light_by_mac(b))
+        self.bookcase_group = GoogleLightGroup(
+            name,
+            group,
+            keywords,
+        )
+        mac = self.bookcase_group.get_mac()
+        self.macs_by_name[name] = mac
+        self._keywords_by_name[name] = keywords
+        self.keywords_by_mac[mac] = keywords
+        self.names_by_mac[mac] = name
+        self.index_light(name, keywords, mac)
+
+        name = 'dining_room_lights'
+        group = []
+        for b in diningroom_lights:
+            group.append(self.get_light_by_mac(b))
+        self.diningroom_group = GoogleLightGroup(
+            name,
+            group,
+            keywords,
+        )
+        mac = self.diningroom_group.get_mac()
+        self.macs_by_name[name] = mac
+        self._keywords_by_name[name] = keywords
+        self.keywords_by_mac[mac] = keywords
+        self.names_by_mac[mac] = name
+        self.index_light(name, keywords, mac)
 
     def index_light(self, name: str, keywords: str, mac: str) -> None:
         properties = [("name", name)]
@@ -477,14 +504,13 @@ class LightingConfig(object):
                 properties.append((key, value))
             else:
                 tags.add(kw)
-        self.corpus.add_doc(
-            logical_search.Document(
-                docid=mac,
-                tags=tags,
-                properties=properties,
-                reference=None,
-            )
+        light = logical_search.Document(
+            docid=mac,
+            tags=tags,
+            properties=properties,
+            reference=None,
         )
+        self.corpus.add_doc(light)
 
     def __repr__(self) -> str:
         s = "Known devices:\n"
@@ -522,7 +548,7 @@ class LightingConfig(object):
                 light = self.get_light_by_mac(mac)
                 if light is not None:
                     retval.append(light)
-        return group_google_lights(retval)
+        return retval
 
     def get_light_by_mac(self, mac: str) -> Optional[Light]:
         if mac in self.keywords_by_mac:
@@ -532,8 +558,10 @@ class LightingConfig(object):
                 return self.bookcase_group
             elif name == 'dining_room_lights':
                 return self.diningroom_group
-            elif "tplink" in kws.lower():
+            elif 'tplink' in kws.lower():
                 return TPLinkLight(name, mac, kws)
+            elif 'tuya' in kws.lower():
+                return TuyaLight(name, mac, kws)
             else:
                 return GoogleLight(name, mac, kws)
         return None
@@ -551,4 +579,4 @@ class LightingConfig(object):
                     light = self.get_light_by_mac(mac)
                     if light is not None:
                         retval.append(light)
-        return group_google_lights(retval)
+        return retval
index a8030e30812ceb804a6c45ff386a7ab159fa2911..05512b564c2303372ff81660840bfd29dcba942b 100644 (file)
@@ -48,6 +48,23 @@ def prepend(item: Any, lst: List[Any]) -> List[Any]:
     return lst
 
 
+def remove_list_if_one_element(lst: List[Any]) -> Any:
+    """
+    Remove the list and return the 0th element iff its length is one.
+
+    >>> remove_list_if_one_element([1234])
+    1234
+
+    >>> remove_list_if_one_element([1, 2, 3, 4])
+    [1, 2, 3, 4]
+
+    """
+    if len(lst) == 1:
+        return lst[0]
+    else:
+        return lst
+
+
 def population_counts(lst: List[Any]) -> Mapping[Any, int]:
     """
     Return a population count mapping for the list (i.e. the keys are
@@ -61,29 +78,39 @@ def population_counts(lst: List[Any]) -> Mapping[Any, int]:
     return Counter(lst)
 
 
-def most_common_item(lst: List[Any]) -> Any:
+def most_common(lst: List[Any], *, count=1) -> Any:
 
     """
     Return the most common item in the list.  In the case of ties,
     which most common item is returned will be random.
 
-    >>> most_common_item([1, 1, 1, 2, 2, 3, 3, 3, 3, 4, 4])
+    >>> most_common([1, 1, 1, 2, 2, 3, 3, 3, 3, 4, 4])
     3
 
+    >>> most_common([1, 1, 1, 2, 2, 3, 3, 3, 3, 4, 4], count=2)
+    [3, 1]
+
     """
-    return population_counts(lst).most_common(1)[0][0]
+    p = population_counts(lst)
+    return remove_list_if_one_element([_[0] for _ in p.most_common()[0:count]])
 
 
-def least_common_item(lst: List[Any]) -> Any:
+def least_common(lst: List[Any], *, count=1) -> Any:
     """
     Return the least common item in the list.  In the case of
     ties, which least common item is returned will be random.
 
-    >>> least_common_item([1, 1, 1, 2, 2, 3, 3, 3, 4])
+    >>> least_common([1, 1, 1, 2, 2, 3, 3, 3, 4])
     4
 
+    >>> least_common([1, 1, 1, 2, 2, 3, 3, 3, 4], count=2)
+    [4, 2]
+
     """
-    return population_counts(lst).most_common()[-1][0]
+    p = population_counts(lst)
+    mc = p.most_common()[-count:]
+    mc.reverse()
+    return remove_list_if_one_element([_[0] for _ in mc])
 
 
 def dedup_list(lst: List[Any]) -> List[Any]:
@@ -104,6 +131,46 @@ def uniq(lst: List[Any]) -> List[Any]:
     return dedup_list(lst)
 
 
+def contains_duplicates(lst: List[Any]) -> bool:
+    """
+    Does the list contian duplicate elements or not?
+
+    >>> lst = [1, 2, 1, 3, 3, 4, 4, 5, 6, 1, 3, 4]
+    >>> contains_duplicates(lst)
+    True
+
+    >>> contains_duplicates(dedup_list(lst))
+    False
+
+    """
+    seen = set()
+    for _ in lst:
+        if _ in seen:
+            return True
+        seen.add(_)
+    return False
+
+
+def all_unique(lst: List[Any]) -> bool:
+    """
+    Inverted alias for contains_duplicates.
+    """
+    return not contains_duplicates(lst)
+
+
+def transpose(lst: List[Any]) -> List[Any]:
+    """
+    Transpose a list of lists.
+
+    >>> lst = [[1, 2], [3, 4], [5, 6]]
+    >>> transpose(lst)
+    [[1, 3, 5], [2, 4, 6]]
+
+    """
+    transposed = zip(*lst)
+    return [list(_) for _ in transposed]
+
+
 def ngrams(lst: Sequence[Any], n):
     """
     Return the ngrams in the sequence.
diff --git a/site_config.py b/site_config.py
new file mode 100644 (file)
index 0000000..81185bf
--- /dev/null
@@ -0,0 +1,61 @@
+#!/usr/bin/env python3
+
+from dataclasses import dataclass
+import logging
+import platform
+from typing import Optional
+
+import config
+
+logger = logging.getLogger(__name__)
+args = config.add_commandline_args(
+    f'({__file__})',
+    'Args related to __file__'
+)
+
+args.add_argument(
+    '--site_config_location',
+    default='AUTO',
+    const='AUTO',
+    nargs='?',
+    choices=('HOUSE', 'CABIN', 'AUTO'),
+    help='Where are we, HOUSE or CABIN?'
+)
+
+
+@dataclass
+class SiteConfig(object):
+    network: str
+    network_netmask: str
+    network_router_ip: str
+
+
+def get_location():
+    location = config.config['site_config_location']
+    if location == 'AUTO':
+        hostname = platform.node()
+        if '.house' in hostname:
+            location = 'HOUSE'
+        elif '.cabin' in hostname:
+            location = 'CABIN'
+        else:
+            raise Exception(f'Unknown hostname {hostname}, help.')
+    return location
+
+
+def get_config():
+    location = get_location()
+    if location == 'HOUSE':
+        return SiteConfig(
+            network = '10.0.0.0/24',
+            network_netmask = '255.255.255.0',
+            network_router_ip = '10.0.0.1',
+        )
+    elif location == 'CABIN':
+        return SiteConfig(
+            network = '192.168.0.0/24',
+            network_netmask = '255.255.255.0',
+            network_router_ip = '192.168.0.1',
+        )
+    else:
+        raise Exception('Unknown site location')
index a6a2da3155fcf312e9a9bc21ecd94f26141b4fcb..9a38d25c49cccddceec4da06ee8bbfe8133749aa 100644 (file)
@@ -791,6 +791,9 @@ def extract_mac_address(in_str: Any, *, separator: str = ":") -> Optional[str]:
     >>> extract_mac_address(' MAC Address: 34:29:8F:12:0D:2F')
     '34:29:8F:12:0D:2F'
 
+    >>> extract_mac_address('? (10.0.0.30) at d8:5d:e2:34:54:86 on em0 expires in 1176 seconds [ethernet]')
+    'd8:5d:e2:34:54:86'
+
     """
     if not is_full_string(in_str):
         return None
@@ -1500,6 +1503,19 @@ def from_bitstring(bits: str, encoding='utf-8', errors='surrogatepass') -> str:
     return n.to_bytes((n.bit_length() + 7) // 8, 'big').decode(encoding, errors) or '\0'
 
 
+def ip_v4_sort_key(txt: str) -> str:
+    """Turn an IPv4 address into a tuple for sorting purposes.
+
+    >>> ip_v4_sort_key('10.0.0.18')
+    (10, 0, 0, 18)
+
+    """
+    if not is_ip_v4(txt):
+        print(f"not IP: {txt}")
+        return None
+    return tuple([int(x) for x in txt.split('.')])
+
+
 if __name__ == '__main__':
     import doctest
     doctest.testmod()