from dataclasses import dataclass
import datetime
import json
+import logging
import os
from typing import List
import urllib.request
import list_utils
import persistent
+logger = logging.getLogger(__name__)
+
cfg = config.add_commandline_args(
f'Cached Weather Data List ({__file__})',
'Arguments controlling cached weather data',
date: datetime.date # The date
high: float # The predicted high in F
low: float # The predicted low in F
+ precipitation_inchs: float # Number of inches of precipitation / day
conditions: List[str] # Conditions per ~3h window
most_common_condition: str # The most common condition
icon: str # An icon to represent it
highs = {}
lows = {}
conditions = {}
+ precip = {}
param = "id=5786882" # Bellevue, WA
key = "c0b160c49743622f62a9cd3cda0270b3"
www = urllib.request.urlopen(
response = www.read()
www.close()
parsed_json = json.loads(response)
+ logger.debug(parsed_json)
dt = datetime.datetime.fromtimestamp(parsed_json["dt"]).date()
dates.add(dt)
condition = parsed_json["weather"][0]["main"]
icon = icon_by_condition.get(condition, '?')
+ p = 0.0
+ if 'rain' in parsed_json:
+ if '3h' in parsed_json['rain']:
+ p += float(parsed_json['rain']['3h'])
+ elif '1h' in parsed_json['rain']:
+ p += float(parsed_json['rain']['1h'])
+ if 'snow' in parsed_json:
+ if '3h' in parsed_json['snow']:
+ p += float(parsed_json['snow']['3h'])
+ elif '1h' in parsed_json['snow']:
+ p += float(parsed_json['snow']['1h'])
if dt == now.date() and now.hour > 18 and condition == 'Clear':
icon = '🌙'
self.weather_data[dt] = WeatherData(
date = dt,
high = float(parsed_json["main"]["temp_max"]),
low = float(parsed_json["main"]["temp_min"]),
+ precipitation_inchs = p / 25.4,
conditions = [condition],
most_common_condition = condition,
icon = icon,
response = www.read()
www.close()
parsed_json = json.loads(response)
+ logger.debug(parsed_json)
count = parsed_json["cnt"]
for x in range(count):
data = parsed_json["list"][x]
highs[dt] = None
lows[dt] = None
conditions[dt] = []
- temp = data["main"]["temp"]
- if highs[dt] is None or temp > highs[dt]:
- highs[dt] = temp
- if lows[dt] is None or temp < lows[dt]:
- lows[dt] = temp
+ for temp in (
+ data["main"]["temp"],
+ data['main']['temp_min'],
+ data['main']['temp_max'],
+ ):
+ if highs[dt] is None or temp > highs[dt]:
+ highs[dt] = temp
+ if lows[dt] is None or temp < lows[dt]:
+ lows[dt] = temp
cond = data["weather"][0]["main"]
+ precip[dt] = 0.0
+ if 'rain' in parsed_json:
+ if '3h' in parsed_json['rain']:
+ precip[dt] += float(parsed_json['rain']['3h'])
+ elif '1h' in parsed_json['rain']:
+ precip[dt] += float(parsed_json['rain']['1h'])
+ if 'snow' in parsed_json:
+ if '3h' in parsed_json['snow']:
+ precip[dt] += float(parsed_json['snow']['3h'])
+ elif '1h' in parsed_json['snow']:
+ precip[dt] += float(parsed_json['snow']['1h'])
conditions[dt].append(cond)
today = datetime_utils.now_pacific().date()
date = dt,
high = highs[dt],
low = lows[dt],
+ precipitation_inchs = precip[dt] / 25.4,
conditions = conditions[dt],
most_common_condition = most_common_condition,
icon = icon
def __setitem__(self, key, value):
if key in self:
- self.inverse[self[key]].remove(key)
+ old_value = self[key]
+ self.inverse[old_value].remove(key)
super().__setitem__(key, value)
self.inverse.setdefault(value, []).append(key)
def __delitem__(self, key):
- self.inverse.setdefault(self[key], []).remove(key)
- if self[key] in self.inverse and not self.inverse[self[key]]:
- del self.inverse[self[key]]
+ value = self[key]
+ self.inverse.setdefault(value, []).remove(key)
+ if value in self.inverse and not self.inverse[value]:
+ del self.inverse[value]
super().__delitem__(key)
class Node(object):
def __init__(self, value: Any) -> None:
+ """
+ Note: value can be anything as long as it is comparable.
+ Check out @functools.total_ordering.
+ """
self.left = None
self.right = None
self.value = value
return node
elif (value < node.value and node.left is not None):
return self._find(value, node.left)
- else:
- assert value > node.value
- if node.right is not None:
- return self._find(value, node.right)
+ elif (value > node.value and node.right is not None):
+ return self._find(value, node.right)
return None
def _parent_path(self, current: Node, target: Node):
def parent_path(self, node: Node) -> Optional[List[Node]]:
"""Return a list of nodes representing the path from
- the tree's root to the node argument.
+ the tree's root to the node argument. If the node does
+ not exist in the tree for some reason, the last element
+ on the path will be None but the path will indicate the
+ ancestor path of that node were it inserted.
>>> t = BinarySearchTree()
>>> t.insert(50)
12
4
+ >>> del t[4]
+ >>> for x in t.parent_path(n):
+ ... if x is not None:
+ ... print(x.value)
+ ... else:
+ ... print(x)
+ 50
+ 25
+ 12
+ None
+
"""
return self._parent_path(self.root, node)
"""
Delete an item from the tree and preserve the BST property.
- 50
- / \
- 25 75
- / / \
- 22 66 85
- /
- 13
-
-
>>> t = BinarySearchTree()
>>> t.insert(50)
>>> t.insert(75)
>>> t.insert(22)
>>> t.insert(13)
>>> t.insert(85)
+ >>> t
+ 50
+ ├──25
+ │ └──22
+ │ └──13
+ └──75
+ ├──66
+ └──85
>>> for value in t.iterate_inorder():
... print(value)
50
66
85
+ >>> t
+ 50
+ ├──25
+ └──85
+ └──66
>>> t.__delitem__(99)
False
def repr_traverse(self, padding: str, pointer: str, node: Node, has_right_sibling: bool) -> str:
if node is not None:
- self.viz += f'\n{padding}{pointer}{node.value}'
+ viz = f'\n{padding}{pointer}{node.value}'
if has_right_sibling:
padding += "│ "
else:
else:
pointer_left = "└──"
- self.repr_traverse(padding, pointer_left, node.left, node.right is not None)
- self.repr_traverse(padding, pointer_right, node.right, False)
+ viz += self.repr_traverse(padding, pointer_left, node.left, node.right is not None)
+ viz += self.repr_traverse(padding, pointer_right, node.right, False)
+ return viz
+ return ""
def __repr__(self):
"""
if self.root is None:
return ""
- self.viz = f'{self.root.value}'
+ ret = f'{self.root.value}'
pointer_right = "└──"
if self.root.right is None:
pointer_left = "└──"
else:
pointer_left = "├──"
- self.repr_traverse('', pointer_left, self.root.left, self.root.left is not None)
- self.repr_traverse('', pointer_right, self.root.right, False)
- return self.viz
+ ret += self.repr_traverse('', pointer_left, self.root.left, self.root.left is not None)
+ ret += self.repr_traverse('', pointer_right, self.root.right, False)
+ return ret
if __name__ == '__main__':
self.root = {}
self.end = "~END~"
self.length = 0
+ self.viz = ''
def insert(self, item: Sequence[Any]):
"""
return None
return [x for x in node if x != self.end]
- def repr_recursive(self, node, delimiter):
+ def repr_fancy(self, padding: str, pointer: str, parent: str, node: Any, has_sibling: bool):
+ if node is None:
+ return
+ if node is not self.root:
+ ret = f'\n{padding}{pointer}'
+ if has_sibling:
+ padding += '│ '
+ else:
+ padding += ' '
+ else:
+ ret = f'{pointer}'
+
+ child_count = 0
+ for child in node:
+ if child != self.end:
+ child_count += 1
+
+ for child in node:
+ if child != self.end:
+ if child_count > 1:
+ pointer = "├──"
+ has_sibling = True
+ else:
+ pointer = "└──"
+ has_sibling = False
+ pointer += f'{child}'
+ child_count -= 1
+ ret += self.repr_fancy(padding, pointer, node, node[child], has_sibling)
+ return ret
+
+ def repr_brief(self, node, delimiter):
"""
A friendly string representation of the contents of the Trie.
>>> t.insert([10, 0, 0, 2])
>>> t.insert([10, 10, 10, 1])
>>> t.insert([10, 10, 10, 2])
- >>> t.repr_recursive(t.root, '.')
+ >>> t.repr_brief(t.root, '.')
'10.[0.0.[1, 2], 10.10.[1, 2]]'
- >>> print(t)
- 10[00[1, 2], 1010[1, 2]]
"""
child_count = 0
for child in node:
if child != self.end:
child_count += 1
- child_rep = self.repr_recursive(node[child], delimiter)
+ child_rep = self.repr_brief(node[child], delimiter)
if len(child_rep) > 0:
my_rep += str(child) + delimiter + child_rep + ", "
else:
def __repr__(self):
"""
A friendly string representation of the contents of the Trie. Under
- the covers uses repr_recursive with no delimiter
+ the covers uses repr_fancy.
>>> t = Trie()
>>> t.insert([10, 0, 0, 1])
>>> t.insert([10, 10, 10, 1])
>>> t.insert([10, 10, 10, 2])
>>> print(t)
- 10[00[1, 2], 1010[1, 2]]
+ *
+ └──10
+ ├──0
+ │ └──0
+ │ ├──1
+ │ └──2
+ └──10
+ └──10
+ ├──1
+ └──2
"""
- return self.repr_recursive(self.root, '')
+ return self.repr_fancy('', '*', self.root, self.root, False)
if __name__ == '__main__':
'--config_dump',
default=False,
action='store_true',
- help='Display the global configuration on STDERR at program startup.',
+ help='Display the global configuration (possibly derived from multiple sources) on STDERR at program startup.',
)
group.add_argument(
'--config_savefile',
type=str,
metavar='FILENAME',
default=None,
- help='Populate config file compatible with --config_loadfile to save config for later use.',
+ help='Populate config file compatible with --config_loadfile to save global config for later use.',
)
def parse(entry_module: Optional[str]) -> Dict[str, Any]:
- """Main program should call this early in main()"""
+ """Main program should call this early in main(). Note that the
+ bootstrap.initialize wrapper takes care of this automatically.
+
+ """
global config_parse_called
if config_parse_called:
return config
reordered_action_groups.insert(0, group)
args._action_groups = reordered_action_groups
- # Examine the environment variables that match known flags. For a
- # flag called --example_flag the corresponding environment
+ # Examine the environment for variables that match known flags.
+ # For a flag called --example_flag the corresponding environment
# variable would be called EXAMPLE_FLAG.
usage_message = args.format_usage()
optional = False
import logging
from typing import NamedTuple
+import sys
import requests
import speech_recognition as sr # type: ignore
logger.exception(e)
logger.warning('Unable to parse Google assistant\'s response.')
audio_transcription = None
+ return GoogleResponse(
+ success=success,
+ response=response,
+ audio_url=audio,
+ audio_transcription=audio_transcription,
+ )
else:
- logger.error(
- f'HTTP request to {url} with {payload} failed; code {r.status_code}'
+ message = f'HTTP request to {url} with {payload} failed; code {r.status_code}'
+ logger.error(message)
+ return GoogleResponse(
+ success=False,
+ response=message,
+ audio_url=audio,
+ audio_transcription=audio_transcription,
)
- return GoogleResponse(
- success=success,
- response=response,
- audio_url=audio,
- audio_transcription=audio_transcription,
- )
+ sys.exit(-1)
import nltk
from nltk.stem import PorterStemmer
+import decorator_utils
import string_utils
logger = logging.getLogger(__name__)
+@decorator_utils.singleton
class ProfanityFilter(object):
def __init__(self):
self.bad_words = set([
'blonde action',
'blow j',
'blow job',
+ 'blowjob',
'blow my',
'blow me',
'blow ourselv',
#!/usr/bin/env python3
+import base64
import contextlib
import datetime
import io
return sprintf(txt.format(**values), end='')
+def to_ascii(x: str):
+ """Encode as ascii bytes string.
+
+ >>> to_ascii('test')
+ b'test'
+
+ >>> to_ascii(b'1, 2, 3')
+ b'1, 2, 3'
+
+ """
+ if type(x) is str:
+ return x.encode('ascii')
+ if type(x) is bytes:
+ return x
+ raise Exception('to_ascii works with strings and bytes')
+
+
+def to_base64(txt: str, *, encoding='utf-8', errors='surrogatepass') -> str:
+ """Encode txt and then encode the bytes with a 64-character
+ alphabet. This is compatible with uudecode.
+
+ >>> to_base64('hello?')
+ b'aGVsbG8/\\n'
+
+ """
+ return base64.encodebytes(txt.encode(encoding, errors))
+
+
+def is_base64(txt: str) -> bool:
+ """Determine whether a string is base64 encoded (with Python's standard
+ base64 alphabet which is the same as what uuencode uses).
+
+ >>> is_base64('test') # all letters in the b64 alphabet
+ True
+
+ >>> is_base64('another test, how do you like this one?')
+ False
+
+ >>> is_base64(b'aGVsbG8/\\n') # Ending newline is ok.
+ True
+
+ """
+ a = string.ascii_uppercase + string.ascii_lowercase + string.digits + '+/'
+ alphabet = set(a.encode('ascii'))
+ for char in to_ascii(txt.strip()):
+ if char not in alphabet:
+ return False
+ return True
+
+
+def from_base64(b64: str, encoding='utf-8', errors='surrogatepass') -> str:
+ """Convert base64 encoded string back to normal strings.
+
+ >>> from_base64(b'aGVsbG8/\\n')
+ 'hello?'
+
+ """
+ return base64.decodebytes(b64).decode(encoding, errors)
+
+
+def chunk(txt: str, chunk_size):
+ """Chunk up a string.
+
+ >>> ' '.join(chunk('010011011100010110101010101010101001111110101000', 8))
+ '01001101 11000101 10101010 10101010 10011111 10101000'
+
+ """
+ if len(txt) % chunk_size != 0:
+ logger.warning(
+ f'String to chunk\'s length ({len(txt)} is not an even multiple of chunk_size ({chunk_size})')
+ for x in range(0, len(txt), chunk_size):
+ yield txt[x:x+chunk_size]
+
+
+def to_bitstring(txt: str, *, delimiter='', encoding='utf-8', errors='surrogatepass') -> str:
+ """Encode txt and then chop it into bytes. Note: only bitstrings
+ with delimiter='' are interpretable by from_bitstring.
+
+ >>> to_bitstring('hello?')
+ '011010000110010101101100011011000110111100111111'
+
+ >>> to_bitstring('test', delimiter=' ')
+ '01110100 01100101 01110011 01110100'
+
+ """
+ bits = bin(
+ int.from_bytes(
+ txt.encode(encoding, errors),
+ 'big'
+ )
+ )
+ bits = bits[2:]
+ return delimiter.join(chunk(bits.zfill(8 * ((len(bits) + 7) // 8)), 8))
+
+
+def is_bitstring(txt: str) -> bool:
+ """Is this a bitstring?
+
+ >>> is_bitstring('011010000110010101101100011011000110111100111111')
+ True
+
+ >>> is_bitstring('1234')
+ False
+
+ """
+ return is_binary_integer_number(f'0b{txt}')
+
+
+def from_bitstring(bits: str, encoding='utf-8', errors='surrogatepass') -> str:
+ """Convert from bitstring back to bytes then decode into a str.
+
+ >>> from_bitstring('011010000110010101101100011011000110111100111111')
+ 'hello?'
+
+ """
+ n = int(bits, 2)
+ return n.to_bytes((n.bit_length() + 7) // 8, 'big').decode(encoding, errors) or '\0'
+
+
if __name__ == '__main__':
import doctest
doctest.testmod()