--- /dev/null
+#!/usr/bin/env python3
+
+import datetime
+import re
+from typing import Dict, List, Optional
+import xml
+
+from dateutil.parser import parse
+
+import generic_news_rss_renderer as gnrss
+
+
+class bellevue_city_calendar_renderer(gnrss.generic_news_rss_renderer):
+ """Read the Bellevue City Calendar feed."""
+
+ def __init__(
+ self,
+ name_to_timeout_dict: Dict[str, int],
+ feed_site: str,
+ feed_uris: List[str],
+ page_title: str,
+ ):
+ super(bellevue_city_calendar_renderer, self).__init__(
+ name_to_timeout_dict, feed_site, feed_uris, page_title
+ )
+ self.debug = True
+
+ def debug_prefix(self) -> str:
+ return f"bellevue_calendar({self.page_title})"
+
+ def get_headlines_page_prefix(self) -> str:
+ return "bellevue-calendar"
+
+ def get_details_page_prefix(self) -> str:
+ return "bellevue-calendar-details"
+
+ def should_use_https(self) -> bool:
+ return True
+
+ def get_event_time(self, item: xml.etree.ElementTree.Element) -> Optional[datetime.datetime]:
+ return parse(self.find_pubdate(item))
+
+ def find_pubdate(self, item: xml.etree.ElementTree.Element) -> Optional[str]:
+ descr = item.findtext("description")
+ if descr is None:
+ return None
+ m = re.search(r'time datetime="([^"]+)"', descr)
+ if m is None:
+ return None
+ return m.group(1)
+
+ def item_is_interesting_for_headlines(
+ self, title: str, description: str, item: xml.etree.ElementTree.Element
+ ) -> bool:
+ if "City Council" in title:
+ return False
+ if "City Offices Closed" in title:
+ return False
+ if "Regular Meeting" in title:
+ return False
+ if "Commission" in title:
+ return False
+ date = self.get_event_time(item)
+ if date is None:
+ return False
+ tzinfo = date.tzinfo
+ now = datetime.datetime.now(tzinfo)
+ delta = (now - date).total_seconds() / (60 * 60 * 24)
+ return delta < 0
+
+ def do_details(self) -> bool:
+ return False
+
+ def item_is_interesting_for_article(
+ self, title: str, description: str, item: xml.etree.ElementTree.Element
+ ) -> bool:
+ return False
+
+
+# Test
+#x = bellevue_city_calendar_renderer(
+# {"Fetch News" : 1,
+# "Shuffle News" : 1},
+# "bellevuewa.gov",
+# [ "/calendar/events.xml" ],
+# "Test" )
+#if x.fetch_news() == 0:
+# print("Error fetching news, no items fetched.")
+#else:
+# x.shuffle_news()
import re
from typing import List, Dict
import xml
+import xml.etree.ElementTree as ET
import generic_news_rss_renderer as gnrss
self.debug = True
def debug_prefix(self) -> str:
- return "bellevue_reporter(%s)" % (self.page_title)
+ return f"bellevue_reporter({self.page_title})"
def get_headlines_page_prefix(self) -> str:
return "bellevue-reporter"
def should_use_https(self) -> bool:
return True
- def munge_description(self, description: str) -> str:
+ def munge_description(self, description: str, item: ET.Element) -> str:
description = re.sub("<[^>]+>", "", description)
description = re.sub(
"Bellevue\s+Reporter\s+Bellevue\s+Reporter", "", description
def looks_like_review(title: str, description: str) -> bool:
return "review" in title or "Review" in title
+ @staticmethod
+ def looks_like_spam(title: str, description: str) -> bool:
+ return (
+ 'marketplace' in description
+ or 'national-marketplace' in description
+ or re.search('[Ww]eed', title) is not None
+ or re.search('[Cc]annabis', title) is not None
+ or re.search('[Cc]annabis', description) is not None
+ or 'THC' in title
+ or re.search('[Ll]ose [Ww]eight', title) is not None
+ or re.search('[Ll]ose [Ww]eight', description) is not None
+ )
+
def item_is_interesting_for_headlines(
self, title: str, description: str, item: xml.etree.ElementTree.Element
) -> bool:
+ unfiltered_description = item.findtext("description")
if self.is_item_older_than_n_days(item, 10):
- self.debug_print("%s: is too old!" % title)
+ self.debug_print(f'{title}: is too old!')
+ return False
+ if bellevue_reporter_rss_renderer.looks_like_spam(title, unfiltered_description):
+ self.debug_print(f'{title}: looks like spam')
return False
if bellevue_reporter_rss_renderer.looks_like_football(title, description):
- self.debug_print("%s: looks like it's about football." % title)
+ self.debug_print(f'{title}: looks like it\'s about football.')
return False
if bellevue_reporter_rss_renderer.looks_like_review(title, description):
- self.debug_print("%s: looks like bullshit." % title)
+ self.debug_print(f'{title}: looks like a review.')
return False
return True
def item_is_interesting_for_article(
self, title: str, description: str, item: xml.etree.ElementTree.Element
) -> bool:
+ unfiltered_description = item.findtext("description")
if self.is_item_older_than_n_days(item, 10):
- self.debug_print("%s: is too old!" % title)
+ self.debug_print(f'{title}: is too old!')
+ return False
+ if bellevue_reporter_rss_renderer.looks_like_spam(title, unfiltered_description):
+ self.debug_print(f'{title}: looks like spam')
return False
if bellevue_reporter_rss_renderer.looks_like_football(title, description):
- self.debug_print("%s: looks like it's about football." % title)
+ self.debug_print(f'{title}: looks like it\'s about football.')
return False
if bellevue_reporter_rss_renderer.looks_like_review(title, description):
- self.debug_print("%s: looks like bullshit." % title)
+ self.debug_print(f'{title}: looks like a review.')
return False
return True
self.triggers_in_the_past_seven_min = {
"driveway": 0,
"frontdoor": 0,
+ "doorbell": 0,
"cabin_driveway": 0,
}
self.last_trigger_timestamp = {
"driveway": 0,
"frontdoor": 0,
+ "doorbell": 0,
"cabin_driveway": 0,
}
def choose_priority(self, camera: str, age: int) -> int:
"""Based on the camera name and last trigger age, compute priority."""
base_priority_by_camera = {
- "driveway": 1,
+ "driveway": 3,
"frontdoor": 2,
- "cabin_driveway": 1,
+ "doorbell": 1,
+ "cabin_driveway": 3,
}
priority = base_priority_by_camera[camera]
if age < 10:
"""Return a list of triggered pages with priorities."""
triggers = []
num_cameras_with_recent_triggers = 0
- camera_list = ["driveway", "frontdoor", "cabin_driveway"]
+ camera_list = ["driveway", "frontdoor", "doorbell", "cabin_driveway"]
now = time.time()
try:
return (page, triggered)
# Always show the clock in the middle of the night.
- elif now.hour < 7:
+ elif now.hour < 6:
for page in self.pages:
if "clock" in page:
return (page, False)
def get_details_page_prefix(self) -> str:
return f"cnn-details-{self.page_title}"
- def munge_description(self, description: str) -> str:
+ def munge_description(self, description: str, item: xml.etree.ElementTree.Element) -> str:
description = re.sub("[Rr]ead full story for latest details.", "", description)
description = re.sub("<[^>]+>", "", description)
return description
seconds_per_day = seconds_per_hour * 24
myq_pagename = "myq_4_300.html"
+internal_stats_pagename = 'internal_stats_1_1000.html'
gcal_imminent_pagename = "hidden/gcal-imminent_0_none.html"
import constants
import os
+from uuid import uuid4
class file_writer:
"""Helper context to write a pages file."""
def __init__(self, filename: str, *, transformations=[]):
+ temp = "temp-" + str(uuid4())
+ self.temp_filename = os.path.join(constants.pages_dir, temp)
self.full_filename = os.path.join(constants.pages_dir, filename)
self.xforms = [file_writer.remove_tricky_unicode]
self.xforms.extend(transformations)
self.f.write(data.encode("utf-8"))
def __enter__(self):
- self.f = open(self.full_filename, "wb")
+ self.f = open(self.temp_filename, "wb")
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
self.close()
+ cmd = f'/bin/mv -f {self.temp_filename} "{self.full_filename}"'
+ os.system(cmd)
+ print(cmd)
def done(self):
self.close()
# Test
-# def toupper(x):
+#def toupper(x):
# return x.upper()
-#
-# with file_writer("test", transformations=[toupper]) as fw:
-# fw.write(u"Another test!!")
+
+#with file_writer("test", transformations=[toupper]) as fw:
+# fw.write(u"Another test!!")
contents of several Google calendars."""
import datetime
+import functools
+import os
+import time
+from typing import Any, Dict, List, Optional, Tuple
+
+from dateutil.parser import parse
import gdata # type: ignore
import gdata_oauth
from oauth2client.client import AccessTokenRefreshError # type: ignore
-import os
-import time
-from typing import Dict, List, Optional, Tuple
+import pytz
import constants
import file_writer
import globals
import renderer
-import secrets
+import kiosk_secrets as secrets
class gcal_renderer(renderer.debuggable_abstaining_renderer):
]
)
+ @functools.total_ordering
class comparable_event(object):
"""A helper class to sort events."""
) -> None:
if start_time is None:
assert(end_time is None)
+ else:
+ assert(isinstance(start_time, datetime.datetime))
+ if end_time is not None:
+ assert(isinstance(end_time, datetime.datetime))
self.start_time = start_time
self.end_time = end_time
self.summary = summary
that.calendar,
)
- def __str__(self) -> str:
+ def __eq__(self, that) -> bool:
+ return (
+ self.start_time == that.start_time and
+ self.end_time == that.end_time and
+ self.summary == that.summary and
+ self.calendar == that.calendar
+ )
+
+ def __repr__(self) -> str:
return "[%s] %s" % (self.timestamp(), self.friendly_name())
def friendly_name(self) -> str:
return "<B>%s</B>" % name
def timestamp(self) -> str:
+ now = datetime.datetime.now(pytz.timezone("US/Pacific"))
if self.start_time is None:
return "None"
- elif self.start_time.hour == 0:
- return datetime.datetime.strftime(self.start_time, "%a %b %d %Y")
+ elif (
+ self.start_time.hour == 0 and
+ self.start_time.minute == 0 and
+ self.start_time.second == 0
+ ):
+ if self.start_time.year == now.year:
+ return datetime.datetime.strftime(
+ self.start_time,
+ "%a %b %d"
+ )
+ else:
+ return datetime.datetime.strftime(
+ self.start_time,
+ "%a %b %d, %Y"
+ )
else:
- return datetime.datetime.strftime(
- self.start_time, "%a %b %d %Y %H:%M%p"
- )
+ dt = self.start_time
+ zone = dt.tzinfo
+ local_dt = dt.replace(tzinfo=zone).astimezone(tz=pytz.timezone('US/Pacific'))
+ if local_dt.year == now.year:
+ return datetime.datetime.strftime(
+ local_dt, "%a %b %d %I:%M%p"
+ )
+ else:
+ return datetime.datetime.strftime(
+ local_dt, "%a %b %d, %Y %I:%M%p"
+ )
def __init__(
self, name_to_timeout_dict: Dict[str, int], oauth: gdata_oauth.OAuth
raise Exception("Unexpected operation")
def get_min_max_timewindow(self) -> Tuple[str, str]:
- now = datetime.datetime.now()
- _time_min = now - datetime.timedelta(1)
- _time_max = now + datetime.timedelta(95)
+ now = datetime.datetime.now(pytz.timezone("US/Pacific"))
+ _time_min = now - datetime.timedelta(hours=6)
+ _time_max = now + datetime.timedelta(days=95)
time_min = datetime.datetime.strftime(_time_min, "%Y-%m-%dT%H:%M:%SZ")
time_max = datetime.datetime.strftime(_time_max, "%Y-%m-%dT%H:%M:%SZ")
self.debug_print(f"time_min is {time_min}")
return (time_min, time_max)
@staticmethod
- def parse_date(date_str: str) -> Optional[datetime.datetime]:
- retval = None
- try:
- _ = date_str.get("date")
- if _:
- retval = datetime.datetime.strptime(_, "%Y-%m-%d")
- else:
- _ = date_str.get("dateTime")
- if _:
- retval = datetime.datetime.strptime(_[:-6], "%Y-%m-%dT%H:%M:%S")
- return retval
- except:
- pass
+ def parse_date(date: Any) -> Optional[datetime.datetime]:
+ if isinstance(date, datetime.datetime):
+ return date
+ elif isinstance(date, dict):
+ if 'dateTime' in date:
+ d = date['dateTime']
+ dt = parse(d)
+ if dt.tzinfo is None:
+ dt = dt.replace(tzinfo=None).astimezone(tz=pytz.timezone('US/Pacific'))
+ return dt
+ elif 'date' in date:
+ d = date['date']
+ dt = datetime.datetime.strptime(d, '%Y-%m-%d')
+ dt = dt.replace(tzinfo=None).astimezone(tz=pytz.timezone('US/Pacific'))
+ return dt
+ print(f'Not sure what to do with this {date} ({type(date)}), help?!')
return None
def get_events_from_interesting_calendars(
)
for event in events["items"]:
summary = event["summary"]
- self.debug_print(
- f" ... event '{summary}' ({event['start']} to {event['end']}"
- )
start = gcal_renderer.parse_date(event["start"])
end = gcal_renderer.parse_date(event["end"])
+ self.debug_print(
+ f" ... event '{summary}' ({event['start']} ({start}) to {event['end']} ({end})"
+ )
if start is not None and end is not None:
+ self.debug_print(f' ... adding {summary} to sortable_events')
sortable_events.append(
gcal_renderer.comparable_event(
start, end, summary, calendar["summary"]
or "Holidays" in calendar["summary"]
or "Countdown" in summary
):
- self.debug_print(" ... event is countdown worthy!")
+ self.debug_print(f" ... adding {summary} to countdown_events")
countdown_events.append(
gcal_renderer.comparable_event(
start, end, summary, calendar["summary"]
) = self.get_events_from_interesting_calendars(time_min, time_max)
self.sortable_events.sort()
with file_writer.file_writer("gcal_3_86400.html") as f:
- f.write("<h1>Upcoming Calendar Events:</h1><hr>\n")
- f.write("<center><table width=96%>\n")
+ f.write(
+f"""
+<h1>Upcoming Calendar Events:</h1>
+<hr>
+<center>
+<table width=96% style="border-collapse: collapse;">
+"""
+ )
upcoming_sortable_events = self.sortable_events[:12]
- for event in upcoming_sortable_events:
+ for n, event in enumerate(upcoming_sortable_events):
+ self.debug_print(f'{n}/12: {event.friendly_name()} / {event.calendar}')
+ if n % 2 == 0:
+ color = "#c6b0b0"
+ else:
+ color = "#eeeeee"
f.write(
- f"""
-<tr>
- <td style="padding-right: 1em;">
- {event.timestamp()}
- </td>
- <td style="padding-left: 1em;">
- {event.friendly_name()}
- </td>
-</tr>\n"""
+f"""
+ <tr>
+ <td style="margin: 0; padding: 0; background: {color};">
+ {event.timestamp()}
+ </td>
+ <td style="margin: 0; padding: 0; background: {color};">
+ {event.friendly_name()}
+ </td>
+ </tr>
+"""
)
f.write("</table></center>\n")
self.countdown_events.sort()
with file_writer.file_writer("countdown_3_7200.html") as g:
g.write("<h1>Countdowns:</h1><hr><ul>\n")
- now = datetime.datetime.now()
+ now = datetime.datetime.now(pytz.timezone("US/Pacific"))
upcoming_countdown_events = self.countdown_events[:12]
count = 0
timestamps = {}
return True
except (gdata.service.RequestError, AccessTokenRefreshError):
print("********* TRYING TO REFRESH GCAL CLIENT *********")
- self.oauth.refresh_token()
- self.client = self.oauth.calendar_service()
+# self.oauth.refresh_token()
+# self.client = self.oauth.calendar_service()
return False
except:
raise
with file_writer.file_writer(constants.gcal_imminent_pagename) as f:
f.write("<h1>Imminent Upcoming Calendar Events:</h1>\n<hr>\n")
f.write("<center><table width=99%>\n")
- now = datetime.datetime.now()
+ now = datetime.datetime.now(pytz.timezone("US/Pacific"))
count = 0
for event in self.sortable_events:
eventstamp = event.start_time
# Test
-# oauth = gdata_oauth.OAuth(secrets.google_client_id, secrets.google_client_secret)
-# x = gcal_renderer(
-# {"Render Upcoming Events": 10000, "Look For Triggered Events": 1},
-# oauth)
-# x.periodic_render("Render Upcoming Events")
+#oauth = gdata_oauth.OAuth(secrets.google_client_secret)
+#x = gcal_renderer(
+# {"Render Upcoming Events": 10000, "Look For Triggered Events": 1},
+# oauth)
+#x.periodic_render("Render Upcoming Events")
# https://developers.google.com/google-apps/calendar/v3/reference/calendars
# https://developers.google.com/picasa-web/
+import logging
+import os
+import pickle
import sys
import urllib.request, urllib.parse, urllib.error
-try:
- import http.client # python2
-except ImportError:
- import http.client # python3
-import os.path
-import json
-import time
-from typing import Dict, Optional
-from oauth2client.client import OAuth2Credentials # type: ignore
-import gdata.calendar.service # type: ignore
-import gdata.docs.service # type: ignore
-import gdata.photos.service, gdata.photos # type: ignore
-from googleapiclient.discovery import build # type: ignore
-import httplib2 # type: ignore
-from googleapiclient.discovery import build
-import datetime
-import ssl
+from apiclient.discovery import build
+from google_auth_oauthlib.flow import InstalledAppFlow
+
+
+logger = logging.getLogger(__file__)
class OAuth:
- def __init__(self, client_id: str, client_secret: str) -> None:
- print("gdata: initializing oauth token...")
- self.client_id = client_id
- self.client_secret = client_secret
- self.user_code: Optional[str] = None
- # print 'Client id: %s' % (client_id)
- # print 'Client secret: %s' % (client_secret)
- self.token: Optional[Dict] = None
- self.device_code = None
- self.verfication_url = None
- self.token_file = "client_secrets.json"
- self.scope = [
- #'https://www.googleapis.com/auth/calendar',
- #'https://www.googleapis.com/auth/drive',
- #'https://docs.google.com/feeds',
- #'https://www.googleapis.com/auth/calendar.readonly',
- #'https://picasaweb.google.com/data/',
- "https://www.googleapis.com/auth/photoslibrary.readonly",
- #'http://picasaweb.google.com/data/',
- #'https://www.google.com/calendar/feeds/',
- ]
- self.host = "accounts.google.com"
- self.reset_connection()
- self.load_token()
- self.last_action = 0.0
- self.ssl_ctx: Optional[ssl.SSLContext] = None
-
- # this setup is isolated because it eventually generates a BadStatusLine
- # exception, after which we always get httplib.CannotSendRequest errors.
- # When this happens, we try re-creating the exception.
- def reset_connection(self) -> None:
- self.ssl_ctx = ssl.create_default_context(cafile="/usr/local/etc/ssl/cert.pem")
- http.client.HTTPConnection.debuglevel = 2
- self.conn = http.client.HTTPSConnection(self.host, context=self.ssl_ctx)
-
- def load_token(self) -> None:
- token = None
- if os.path.isfile(self.token_file):
- f = open(self.token_file)
- json_token = f.read()
- self.token = json.loads(json_token)
- f.close()
-
- def save_token(self) -> None:
- f = open(self.token_file, "w")
- f.write(json.dumps(self.token))
- f.close()
-
- def has_token(self) -> bool:
- if self.token is not None:
- print("gdata: we have a token!")
- else:
- print("gdata: we have no token.")
- return self.token is not None
-
- def get_user_code(self) -> Optional[str]:
- self.conn.request(
- "POST",
- "/o/oauth2/device/code",
- urllib.parse.urlencode(
- {"client_id": self.client_id, "scope": " ".join(self.scope)}
- ),
- {"Content-type": "application/x-www-form-urlencoded"},
- )
- response = self.conn.getresponse()
- if response.status == 200:
- data = json.loads(response.read())
- self.device_code = data["device_code"]
- self.user_code = data["user_code"]
- self.verification_url = data["verification_url"]
- self.retry_interval = data["interval"]
- else:
- self.user_code = None
- print(f"gdata: {response.status}")
- print(response.read())
- sys.exit(-1)
- return self.user_code
-
- def get_new_token(self) -> None:
- # call get_device_code if not already set
- if self.user_code is None:
- print("gdata: getting user code")
- self.get_user_code()
-
- while self.token is None:
- self.conn.request(
- "POST",
- "/o/oauth2/token",
- urllib.parse.urlencode(
- {
- "client_id": self.client_id,
- "client_secret": self.client_secret,
- "code": self.device_code,
- "grant_type": "http://oauth.net/grant_type/device/1.0",
- }
- ),
- {"Content-type": "application/x-www-form-urlencoded"},
+ def __init__(self, client_secret_file='client_secret.json'):
+ self.credentials = None
+ self.credentials_pickle = './credentials.pickle'
+ if os.path.exists(self.credentials_pickle):
+ logger.debug(
+ f'Refreshing credentials from disk pickle file {self.credentials_pickle}'
)
- response = self.conn.getresponse()
- if response.status == 200:
- data = json.loads(response.read())
- if "access_token" in data:
- self.token = data
- self.save_token()
- else:
- time.sleep(self.retry_interval + 2)
- else:
- print("gdata: failed to get token")
- print((response.status))
- print((response.read()))
-
- def refresh_token(self) -> bool:
- if self.checking_too_often():
- print("gdata: not refreshing yet, too soon...")
- return False
+ self.credentials = pickle.load(open(self.credentials_pickle, 'rb'))
else:
- print("gdata: trying to refresh oauth token...")
- self.reset_connection()
- if self.token is None:
- return False
-
- refresh_token = self.token["refresh_token"]
- self.conn.request(
- "POST",
- "/o/oauth2/token",
- urllib.parse.urlencode(
- {
- "client_id": self.client_id,
- "client_secret": self.client_secret,
- "refresh_token": refresh_token,
- "grant_type": "refresh_token",
- }
- ),
- {"Content-type": "application/x-www-form-urlencoded"},
- )
+ logger.debug(
+ f'{self.credentials_pickle} does not exist; calling Google.'
+ )
+ self.refresh_credentials(client_secret_file)
+ self.save()
+ assert self.credentials is not None
- response = self.conn.getresponse()
- self.last_action = time.time()
- if response.status == 200:
- data: Dict = json.loads(response.read())
- if "access_token" in data:
- self.token = data
- # in fact we NEVER get a new refresh token at this point
- if not "refresh_token" in self.token:
- self.token["refresh_token"] = refresh_token
- self.save_token()
- return True
- print(("gdata: unexpected response %d to renewal request" % response.status))
- print((response.read()))
- return False
-
- def checking_too_often(self) -> bool:
- now = time.time()
- return (now - self.last_action) <= 30
-
- # https://developers.google.com/picasa-web/
- def photos_service(self):
- headers = {
- "Authorization": "%s %s"
- % (self.token["token_type"], self.token["access_token"])
- }
- client = gdata.photos.service.PhotosService(additional_headers=headers)
- return client
-
- # https://developers.google.com/drive/
- def docs_service(self):
- cred = OAuth2Credentials(
- self.token["access_token"],
- self.client_id,
- self.client_secret,
- self.token["refresh_token"],
- datetime.datetime.now(),
- "http://accounts.google.com/o/oauth2/token",
- "KitchenKiosk/0.9",
+ def refresh_credentials(self, client_secret_file):
+ scopes = [
+ 'https://www.googleapis.com/auth/calendar.events.readonly',
+ 'https://www.googleapis.com/auth/calendar.readonly',
+ 'https://www.googleapis.com/auth/drive.readonly',
+ 'https://www.googleapis.com/auth/drive.photos.readonly',
+ 'https://www.googleapis.com/auth/photoslibrary.readonly',
+ # 'https://www.googleapis.com/auth/keep.readonly',
+ ]
+ flow = InstalledAppFlow.from_client_secrets_file(
+ self.client_secret_file, scopes=scopes
)
- http = httplib2.Http(disable_ssl_certificate_validation=True)
- http = cred.authorize(http)
- service = build("drive", "v2", http)
- return service
+ self.credentials = flow.run_console()
+
+ def save(self):
+ pickle.dump(self.credentials, open(self.credentials_pickle, 'wb'))
- # https://developers.google.com/google-apps/calendar/
def calendar_service(self):
- cred = OAuth2Credentials(
- self.token["access_token"],
- self.client_id,
- self.client_secret,
- self.token["refresh_token"],
- datetime.datetime.now(),
- "http://accounts.google.com/o/oauth2/token",
- "KitchenKiosk/0.9",
- )
- http = httplib2.Http(disable_ssl_certificate_validation=True)
- http = cred.authorize(http)
- service = build("calendar", "v3", http)
- return service
+ return build("calendar", "v3", credentials=self.credentials)
+
+ def keep_service(self):
+ return build('keep', 'v1',
+ discoveryServiceUrl='https://keep.googleapis.com/$discovery/rest?version=v1',
+ credentials=self.credentials)
+ #print(gkeep_service.notes().list().execute())
+
+
+# class OAuth:
+# def __init__(self, client_id: str, client_secret: str) -> None:
+# print("gdata: initializing oauth token...")
+# self.client_id = client_id
+# self.client_secret = client_secret
+# self.user_code: Optional[str] = None
+# # print 'Client id: %s' % (client_id)
+# # print 'Client secret: %s' % (client_secret)
+# self.token: Optional[Dict] = None
+# self.device_code = None
+# self.verfication_url = None
+# self.token_file = "client_secrets.json"
+# scopes = [
+# 'https://www.googleapis.com/auth/calendar.events.readonly',
+# 'https://www.googleapis.com/auth/calendar.readonly',
+# 'https://www.googleapis.com/auth/drive.readonly',
+# 'https://www.googleapis.com/auth/drive.photos.readonly',
+# 'https://www.googleapis.com/auth/photoslibrary.readonly',
+# # 'https://www.googleapis.com/auth/keep.readonly',
+# ]
+# self.host = "accounts.google.com"
+# self.reset_connection()
+# self.load_token()
+# self.last_action = 0.0
+# self.ssl_ctx: Optional[ssl.SSLContext] = None
+
+# # this setup is isolated because it eventually generates a BadStatusLine
+# # exception, after which we always get httplib.CannotSendRequest errors.
+# # When this happens, we try re-creating the exception.
+# def reset_connection(self) -> None:
+# self.ssl_ctx = ssl.create_default_context() #cafile="/usr/local/etc/ssl/cert.pem")
+# http.client.HTTPConnection.debuglevel = 2
+# self.conn = http.client.HTTPSConnection(self.host, context=self.ssl_ctx)
+
+# def load_token(self) -> None:
+# token = None
+# if os.path.isfile(self.token_file):
+# f = open(self.token_file)
+# json_token = f.read()
+# self.token = json.loads(json_token)
+# f.close()
+
+# def save_token(self) -> None:
+# f = open(self.token_file, "w")
+# f.write(json.dumps(self.token))
+# f.close()
+
+# def has_token(self) -> bool:
+# if self.token is not None:
+# print("gdata: we have a token!")
+# else:
+# print("gdata: we have no token.")
+# return self.token is not None
+
+# def get_user_code(self) -> Optional[str]:
+# self.conn.request(
+# "POST",
+# "/o/oauth2/device/code",
+# urllib.parse.urlencode(
+# {"client_id": self.client_id, "scope": " ".join(self.scope)}
+# ),
+# {"Content-type": "application/x-www-form-urlencoded"},
+# )
+# response = self.conn.getresponse()
+# if response.status == 200:
+# data = json.loads(response.read())
+# self.device_code = data["device_code"]
+# self.user_code = data["user_code"]
+# self.verification_url = data["verification_url"]
+# self.retry_interval = data["interval"]
+# else:
+# self.user_code = None
+# print(f"gdata: {response.status}")
+# print(response.read())
+# sys.exit(-1)
+# return self.user_code
+
+# def get_new_token(self) -> None:
+# # call get_device_code if not already set
+# if self.user_code is None:
+# print("gdata: getting user code")
+# self.get_user_code()
+
+# while self.token is None:
+# self.conn.request(
+# "POST",
+# "/o/oauth2/token",
+# urllib.parse.urlencode(
+# {
+# "client_id": self.client_id,
+# "client_secret": self.client_secret,
+# "code": self.device_code,
+# "grant_type": "http://oauth.net/grant_type/device/1.0",
+# }
+# ),
+# {"Content-type": "application/x-www-form-urlencoded"},
+# )
+# response = self.conn.getresponse()
+# if response.status == 200:
+# data = json.loads(response.read())
+# if "access_token" in data:
+# self.token = data
+# self.save_token()
+# else:
+# time.sleep(self.retry_interval + 2)
+# else:
+# print("gdata: failed to get token")
+# print((response.status))
+# print((response.read()))
+
+# def refresh_token(self) -> bool:
+# if self.checking_too_often():
+# print("gdata: not refreshing yet, too soon...")
+# return False
+# else:
+# print("gdata: trying to refresh oauth token...")
+# self.reset_connection()
+# if self.token is None:
+# return False
+
+# refresh_token = self.token["refresh_token"]
+# self.conn.request(
+# "POST",
+# "/o/oauth2/token",
+# urllib.parse.urlencode(
+# {
+# "client_id": self.client_id,
+# "client_secret": self.client_secret,
+# "refresh_token": refresh_token,
+# "grant_type": "refresh_token",
+# }
+# ),
+# {"Content-type": "application/x-www-form-urlencoded"},
+# )
+
+# response = self.conn.getresponse()
+# self.last_action = time.time()
+# if response.status == 200:
+# data: Dict = json.loads(response.read())
+# if "access_token" in data:
+# self.token = data
+# # in fact we NEVER get a new refresh token at this point
+# if not "refresh_token" in self.token:
+# self.token["refresh_token"] = refresh_token
+# self.save_token()
+# return True
+# print(("gdata: unexpected response %d to renewal request" % response.status))
+# print((response.read()))
+# return False
+
+# def checking_too_often(self) -> bool:
+# now = time.time()
+# return (now - self.last_action) <= 30
+
+# # https://developers.google.com/picasa-web/
+# def photos_service(self):
+# headers = {
+# "Authorization": "%s %s"
+# % (self.token["token_type"], self.token["access_token"])
+# }
+# client = gdata.photos.service.PhotosService(additional_headers=headers)
+# return client
+
+# # https://developers.google.com/drive/
+# def docs_service(self):
+# cred = OAuth2Credentials(
+# self.token["access_token"],
+# self.client_id,
+# self.client_secret,
+# self.token["refresh_token"],
+# datetime.datetime.now(),
+# "http://accounts.google.com/o/oauth2/token",
+# "KitchenKiosk/0.9",
+# )
+# http = httplib2.Http(disable_ssl_certificate_validation=True)
+# http = cred.authorize(http)
+# service = build("drive", "v2", http)
+# return service
+
+# # https://developers.google.com/google-apps/calendar/
+# def calendar_service(self):
+# cred = OAuth2Credentials(
+# self.token["access_token"],
+# self.client_id,
+# self.client_secret,
+# self.token["refresh_token"],
+# datetime.datetime.now(),
+# "http://accounts.google.com/o/oauth2/token",
+# "KitchenKiosk/0.9",
+# )
+# http = httplib2.Http(disable_ssl_certificate_validation=True)
+# http = cred.authorize(http)
+# service = build("calendar", "v3", http)
+# return service
self.page_title = page_title
self.news = grab_bag.grab_bag()
self.details = grab_bag.grab_bag()
- self.filter = profanity_filter.profanity_filter()
+ self.filter = profanity_filter.ProfanityFilter()
@abstractmethod
def debug_prefix(self) -> str:
def find_title(self, item: ET.Element) -> Optional[str]:
return item.findtext("title")
- def munge_title(self, title: str) -> str:
+ def munge_title(self, title: str, item: ET.Element) -> str:
return title
def find_description(self, item: ET.Element) -> Optional[str]:
return item.findtext("description")
- def munge_description(self, description: str) -> str:
+ def munge_description(
+ self,
+ description: str,
+ item: ET.Element
+ ) -> str:
description = re.sub("<[^>]+>", "", description)
return description
) -> bool:
return True
+ def do_headlines(self) -> bool:
+ return True
+
+ def do_details(self) -> bool:
+ return True
+
def is_item_older_than_n_days(self, item: ET.Element, n: int) -> bool:
pubdate = self.find_pubdate(item)
if pubdate is None:
raise Exception
def shuffle_news(self) -> bool:
- headlines = page_builder.page_builder()
- headlines.set_layout(page_builder.page_builder.LAYOUT_FOUR_ITEMS)
- headlines.set_title("%s" % self.page_title)
- subset = self.news.subset(4)
- if subset is None:
- self.debug_print("Not enough messages to choose from.")
- return False
- for msg in subset:
- headlines.add_item(msg)
- headlines.set_custom_html(
- """
-<STYLE>
-a:link {
- color: black;
- text-decoration: none;
- font-weight: bold;
-}
-a:visited {
- color: black;
- text-decoration: none;
- font-weight: bold;
-}
-a:active {
- color: black;
- text-decoration: none;
- font-weight: bold;
-}
-</STYLE>"""
- )
- _ = f"{self.get_headlines_page_prefix()}_{self.get_headlines_page_priority()}_25900.html"
- with file_writer.file_writer(_) as f:
- headlines.render_html(f)
-
- details = page_builder.page_builder()
- details.set_layout(page_builder.page_builder.LAYOUT_ONE_ITEM)
- details.set_custom_html(
- """
-<STYLE>
-a:link {
- color: black;
- text-decoration: none;
- font-weight: bold;
-}
-a:visited {
- color: black;
- text-decoration: none;
- font-weight: bold;
-}
-a:active {
- color: black;
- text-decoration: none;
- font-weight: bold;
-}
-</STYLE>"""
- )
- details.set_title(f"{self.page_title}")
- subset = self.details.subset(1)
- if subset is None:
- self.debug_print("Not enough details to choose from.")
- return False
- for msg in subset:
- blurb = msg
- blurb += "</TD>"
- details.add_item(blurb)
- _ = f"{self.get_details_page_prefix()}_{self.get_details_page_priority()}_86400.html"
- with file_writer.file_writer(_) as g:
- details.render_html(g)
+ if self.do_headlines():
+ headlines = page_builder.page_builder()
+ headlines.set_layout(page_builder.page_builder.LAYOUT_FOUR_ITEMS)
+ headlines.set_title("%s" % self.page_title)
+ subset = self.news.subset(4)
+ if subset is None:
+ self.debug_print("Not enough messages to choose from.")
+ return False
+ for msg in subset:
+ headlines.add_item(msg)
+ headlines.set_custom_html(
+ """
+ <STYLE>
+ a:link {
+ color: black;
+ text-decoration: none;
+ font-weight: bold;
+ }
+ a:visited {
+ color: black;
+ text-decoration: none;
+ font-weight: bold;
+ }
+ a:active {
+ color: black;
+ text-decoration: none;
+ font-weight: bold;
+ }
+ </STYLE>"""
+ )
+ _ = f"{self.get_headlines_page_prefix()}_{self.get_headlines_page_priority()}_25900.html"
+ with file_writer.file_writer(_) as f:
+ headlines.render_html(f)
+
+ if self.do_details():
+ details = page_builder.page_builder()
+ details.set_layout(page_builder.page_builder.LAYOUT_ONE_ITEM)
+ details.set_custom_html(
+ """
+ <STYLE>
+ a:link {
+ color: black;
+ text-decoration: none;
+ font-weight: bold;
+ }
+ a:visited {
+ color: black;
+ text-decoration: none;
+ font-weight: bold;
+ }
+ a:active {
+ color: black;
+ text-decoration: none;
+ font-weight: bold;
+ }
+ </STYLE>"""
+ )
+ details.set_title(f"{self.page_title}")
+ subset = self.details.subset(1)
+ if subset is None:
+ self.debug_print("Not enough details to choose from.")
+ return False
+ for msg in subset:
+ blurb = msg
+ blurb += "</TD>"
+ details.add_item(blurb)
+ _ = f"{self.get_details_page_prefix()}_{self.get_details_page_priority()}_86400.html"
+ with file_writer.file_writer(_) as g:
+ details.render_html(g)
return True
def fetch_news(self) -> bool:
)
try:
response = self.conn.getresponse()
- except:
- print("Exception in generic RSS renderer HTTP connection")
+ except Exception as e:
+ traceback.print_exc(file=sys.stdout)
+ print(
+ f"Exception in generic RSS renderer HTTP connection fetching {self.feed_site}{uri}"
+ )
return False
if response.status != 200:
rss = ET.fromstring(response.read())
channel = rss[0]
+ title_filter = set()
for item in channel.getchildren():
title = self.find_title(item)
- if title is not None:
- title = self.munge_title(title)
description = item.findtext("description")
+ if title is not None:
+ title = self.munge_title(title, item)
if description is not None:
- description = self.munge_description(description)
+ description = self.munge_description(description, item)
else:
description = ""
image = self.find_image(item)
continue
if self.should_profanity_filter() and (
- self.filter.contains_bad_words(title)
- or self.filter.contains_bad_words(description)
+ self.filter.contains_bad_word(title)
+ or self.filter.contains_bad_word(description)
):
self.debug_print(f'Found bad words in item "{title}"')
continue
+ if title in title_filter:
+ self.debug_print(f'Already saw title {title}, skipping.')
+ title_filter.add(title)
+
blurb = """<DIV style="padding:8px;
- font-size:34pt;
- -webkit-column-break-inside:avoid;">"""
+ font-size:34pt;
+ -webkit-column-break-inside:avoid;">"""
if image is not None:
blurb += f'<IMG SRC="{image}" ALIGN=LEFT HEIGHT=115 '
blurb += 'style="padding:8px;">'
import re
from typing import List, Dict
+from google_auth_oauthlib.flow import InstalledAppFlow
+
import constants
import file_writer
import renderer
-import secrets
+import kiosk_secrets as secrets
class gkeep_renderer(renderer.debuggable_abstaining_renderer):
import re
from typing import Dict, List, Optional
import xml
+import xml.etree.ElementTree as ET
import generic_news_rss_renderer
descr = ""
return descr
- def munge_description_internal(self, descr: str) -> str:
+ def munge_description_internal(self, descr: str, item: ET.Element) -> str:
if len(descr) > 450:
descr = descr[:450]
descr = re.sub(r"\<[^\>]*$", "", descr)
descr += "</A></LI></UL></OL></P>"
return descr
- def munge_description(self, description: str) -> str:
+ def munge_description(self, description: str, item: ET.Element) -> str:
soup = BeautifulSoup(description, features="lxml")
for a in soup.findAll("a"):
del a["href"]
descr = str(soup)
- return self.munge_description_internal(descr)
+ return self.munge_description_internal(descr, item)
def find_image(self, item: xml.etree.ElementTree.Element) -> Optional[str]:
return None
#!/usr/bin/env python3
+from collections import Counter
import random
from typing import Iterable, List, Optional, Set
def subset(self, count: int) -> Optional[List[str]]:
if len(self.contents) < count:
return None
- subset = random.sample(self.contents, count)
- return subset
+ return random.sample(self.contents, count)
def size(self) -> int:
return len(self.contents)
# x = grab_bag()
-# x.add_all([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
-# print x.subset(3)
+# x.add_all(["oneA", "two", "three", "oneB", "four", "five", "oneC", "oneD"])
+# print(x.subset(3))
#!/usr/bin/env python3
+import collections
from datetime import datetime
import difflib
import gc
import linecache
+import logging
import os
import re
import sys
import astral # type: ignore
from astral.sun import sun # type: ignore
+import numpy as np
+import pytz
import constants
+import file_writer
import renderer
import renderer
import renderer_catalog
import trigger_catalog
import utils
+logger = logging.getLogger(__file__)
+
def thread_janitor() -> None:
tracemalloc.start()
return best_page
-def process_command(command: str, page_history: List[str]) -> str:
+def process_command(command: str, page_history: List[str], page_chooser) -> str:
page = None
if 'hold' in command:
page = page_history[0]
swap_page_target = 0.0
def filter_news_during_dinnertime(page: str) -> bool:
- now = datetime.now()
+ now = datetime.now(tz=pytz.timezone("US/Pacific"))
is_dinnertime = now.hour >= 17 and now.hour <= 20
return not is_dinnertime or not (
"cnn" in page
pass
if command is not None:
triggered = True
- page = process_command(command, page_history)
+ page = process_command(command, page_history, page_chooser)
# Else pick a page randomly.
else:
override_refresh_sec: int = None,
command: str = None) -> None:
def pick_background_color() -> str:
+ now = datetime.now(tz=pytz.timezone("US/Pacific"))
city = astral.LocationInfo(
"Bellevue", "USA", "US/Pacific", 47.610, -122.201
)
- s = sun(city.observer, date=self.dt, tzinfo=pytz.timezone("US/Pacific"))
+ s = sun(city.observer, date=now, tzinfo=pytz.timezone("US/Pacific"))
sunrise_mod = utils.minute_number(s["sunrise"].hour, s["sunrise"].minute)
sunset_mod = utils.minute_number(s["sunset"].hour, s["sunset"].minute)
- now = datetime.now()
now_mod = utils.minute_number(now.hour, now.minute)
if now_mod < sunrise_mod or now_mod > (sunset_mod + 45):
return "E6B8B8"
def get_refresh_period() -> float:
if override_refresh_sec is not None:
return float(override_refresh_sec * 1000.0)
- now = datetime.now()
+ now = datetime.now(tz=pytz.timezone("US/Pacific"))
if now.hour < 7:
return float(constants.refresh_period_night_sec * 1000.0)
else:
def thread_invoke_renderers() -> None:
+ render_times: Dict[str, np.array] = {}
+ render_counts: collections.Counter = collections.Counter()
+ last_render: Dict[str, datetime] = {}
+
while True:
- print(f"renderer[{utils.timestamp()}]: invoking all renderers in catalog...")
+ print(f'renderer[{utils.timestamp()}]: invoking all overdue renderers in catalog...')
for r in renderer_catalog.get_renderers():
+ name = r.get_name()
now = time.time()
try:
r.render()
except Exception as e:
- traceback.print_exc()
+ traceback.print_exc(file=sys.stdout)
+ logger.exception(e)
print(
- f"renderer[{utils.timestamp()}] unknown exception in {r.get_name()}, swallowing it."
+ f"renderer[{utils.timestamp()}] Unknown exception ({e}) in {name}, swallowing it."
)
+
+ # Increment the count of render operations per renderer.
+ render_counts[name] += 1
+
+ # Keep track of the last time we invoked each renderer.
+ last_render[name] = datetime.now(tz=pytz.timezone("US/Pacific"))
+
+ # Record how long each render operation takes and warn if very long.
delta = time.time() - now
+ times = render_times.get(name, np.array([]))
+ times = np.insert(times, 0, delta)
+ render_times[name] = times
if delta > 1.0:
+ hdr = f'renderer[{utils.timestamp()}]:'
print(
- f"renderer[{utils.timestamp()}]: Warning: {r.get_name()}'s rendering took {delta:5.2f}s."
- )
+f'''
+{hdr} Warning: {name}'s rendering took {delta:5.2f}s.
+{hdr} FYI: {name}'s render times: p25={np.percentile(times, 25):5.2f}, p50={np.percentile(times, 50):5.2f}, p75={np.percentile(times, 75):5.2f}, p90={np.percentile(times, 90):5.2f}, p99={np.percentile(times, 99):5.2f}
+''')
+
+ # Render a page about internal stats of renderers.
+ print(f'renderer[{utils.timestamp()}]: Updating internal statistics page.')
+ with file_writer.file_writer(constants.internal_stats_pagename) as f:
+ f.write(
+f'''
+<TABLE BORDER=0>
+ <TR>
+ <TH>Renderer Name</TH>
+ <TH>Last Run</TH>
+ <TH>Num Invocations</TH>
+ <TH>Render Latency</TH>
+ </TR>
+''')
+ for n, r in enumerate(renderer_catalog.get_renderers()):
+ if n % 2 == 0:
+ style = 'style="margin: 0; padding: 0; background: #c6b0b0;"'
+ else:
+ style = 'style="margin: 0; padding: 0; background: #eeeeee;"'
+ name = r.get_name()
+ last = last_render.get(name, None)
+ if last is None:
+ last = 'never'
+ else:
+ last = last.strftime('%Y/%m/%d %I:%M:%S%P')
+ count = render_counts.get(name, 0)
+ latency = render_times.get(name, np.array([]))
+ p25 = p50 = p75 = p90 = p99 = 'N/A'
+ try:
+ p25 = np.percentile(latency, 25)
+ p50 = np.percentile(latency, 50)
+ p75 = np.percentile(latency, 75)
+ p90 = np.percentile(latency, 90)
+ p99 = np.percentile(latency, 99)
+ except IndexError:
+ pass
+ f.write(
+f'''
+ <TR>
+ <TD {style}>{name}</TD>
+ <TD {style}>{last}</TD>
+ <TD {style}><CENTER>{count}</CENTER></TD>
+ <TD {style}>p25={p25:5.2f}, p50={p50:5.2f}, p75={p75:5.2f}, p90={p90:5.2f}, p99={p99:5.2f}</TD>
+ </TR>
+''')
+ f.write('</TABLE>')
+
print(
- f"renderer[{utils.timestamp()}]: thread having a little break for {constants.render_period_sec}s..."
+ f"renderer[{utils.timestamp()}]: " +
+ f"thread having a little break for {constants.render_period_sec}s..."
)
time.sleep(constants.render_period_sec)
sample_rate = porcupine.sample_rate,
sample_width = 2, # 16 bits
)
- try:
- command = recognizer.recognize_google(speech)
- print('[%s] >>>>>>>>>>>>> Google says command was %s' % (
- str(datetime.now()), command)
- )
- except Exception:
- command = 'weather'
+ command = recognizer.recognize_google(speech)
+ print('[%s] >>>>>>>>>>>>> Google says command was %s' % (
+ str(datetime.now()), command)
+ )
self._queue.put(command)
except Exception as e:
keyword_paths = [pvporcupine.KEYWORD_PATHS[x] for x in ["blueberry", "bumblebee"]]
sensitivities = [0.85, 0.95]
HotwordListener(
+ [],
keyword_paths,
sensitivities,
).listen_forever()
class local_photos_mirror_renderer(renderer.debuggable_abstaining_renderer):
"""A renderer that uses a local mirror of Google photos"""
- album_root_directory = "/var/www/kiosk/pages/images/gphotos/albums"
+ album_root_directory = "/var/www/html/kiosk/images/gphotos/albums"
album_whitelist = frozenset(
[
"Newer Alex Photos",
"Ohme Gardens",
"Olympic Sculpture Park",
+ "Portland, ME 2021",
"Prague and Munich 2019",
"Random",
"Scott and Lynn",
+ "Sculpture Place",
"SFO 2014",
"Skiing with Alex",
"Sonoma",
"Trip to East Coast '16",
"Tuscany 2008",
"Yosemite 2010",
+ "WA Roadtrip, 2021",
"Zoo",
]
)
if extension in self.extension_whitelist:
photo_path = os.path.join(root, filename)
photo_url = photo_path.replace(
- "/var/www/", "http://kiosk.house/", 1
+ "/var/www/html", "http://kiosk.house/", 1
)
self.candidate_photos.add(photo_url)
return True
--- /dev/null
+#!/usr/bin/env python3
+
+from datetime import datetime
+import difflib
+import gc
+import linecache
+import os
+import re
+import sys
+from threading import Thread
+import time
+import traceback
+import tracemalloc
+from typing import Optional, List
+from queue import Queue, Empty
+
+import astral # type: ignore
+from astral.sun import sun # type: ignore
+import pytz
+
+import constants
+import renderer
+import renderer
+import renderer_catalog
+import chooser
+import listen
+import logging
+import pvporcupine
+import trigger_catalog
+import utils
+
+
+def thread_janitor() -> None:
+ tracemalloc.start()
+ tracemalloc_target = 0.0
+ gc_target = 0.0
+ gc.enable()
+
+ while True:
+ now = time.time()
+ if now > tracemalloc_target:
+ tracemalloc_target = now + 30.0
+ snapshot = tracemalloc.take_snapshot()
+ snapshot = snapshot.filter_traces((
+ tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
+ tracemalloc.Filter(False, "<unknown>"),
+ ))
+ key_type = 'lineno'
+ limit = 10
+ top_stats = snapshot.statistics(key_type)
+ print("janitor: Top %s lines" % limit)
+ for index, stat in enumerate(top_stats[:limit], 1):
+ frame = stat.traceback[0]
+ # replace "/path/to/module/file.py" with "module/file.py"
+ filename = os.sep.join(frame.filename.split(os.sep)[-2:])
+ print("janitor: #%s: %s:%s: %.1f KiB"
+ % (index, filename, frame.lineno, stat.size / 1024))
+ line = linecache.getline(frame.filename, frame.lineno).strip()
+ if line:
+ print('janitor: %s' % line)
+
+ other = top_stats[limit:]
+ if other:
+ size = sum(stat.size for stat in other)
+ print("janitor: %s other: %.1f KiB" % (len(other), size / 1024))
+ total = sum(stat.size for stat in top_stats)
+ print("janitor: Total allocated size: %.1f KiB" % (total / 1024))
+ if now > gc_target:
+ print("janitor: Running gc operation")
+ gc_target = now + 60.0
+ gc.collect()
+ time.sleep(10.0)
+
+
+def guess_page(command: str, page_chooser: chooser.chooser) -> str:
+ best_page = None
+ best_score = None
+ for page in page_chooser.get_page_list():
+ page = page.replace('(', ' ')
+ page = page.replace('_', ' ')
+ page = page.replace(')', ' ')
+ page = page.replace('.html', '')
+ page = page.replace('CNNNews', 'news')
+ page = page.replace('CNNTechnology', 'technology')
+ page = page.replace('gocostco', 'costco list')
+ page = page.replace('gohardware', 'hardware list')
+ page = page.replace('gohouse', 'house list honey do')
+ page = page.replace('gcal', 'google calendar events')
+ page = page.replace('mynorthwest', 'northwest news')
+ page = page.replace('myq', 'myq garage door status')
+ page = page.replace('gomenu', 'dinner menu')
+ page = page.replace('wsdot', 'traffic')
+ page = page.replace('gomenu', 'dinner menu')
+ page = page.replace('WSJNews', 'news')
+ page = page.replace('telma', 'telma cabin')
+ page = page.replace('WSJBusiness', 'business news')
+ page = re.sub(r'[0-9]+', '', page)
+ score = SequenceMatcher(None, command, page).ratio()
+ if best_score is None or score > best_score:
+ best_page = page
+ assert best_page is not None
+ return best_page
+
+
+def process_command(command: str, page_history: List[str]) -> str:
+ page = None
+ if 'hold' in command:
+ page = page_history[0]
+ elif 'back' in command:
+ page = page_history[1]
+ elif 'skip' in command:
+ while True:
+ (page, _) = page_chooser.choose_next_page()
+ if page != page_history[0]:
+ break
+ elif 'weather' in command:
+ if 'telma' in command or 'cabin' in command:
+ page = 'weather-telma_3_10800.html'
+ elif 'stevens' in command:
+ page = 'weather-stevens_3_10800.html'
+ else:
+ page = 'weather-home_3_10800.html'
+ elif 'cabin' in command:
+ if 'list' in command:
+ page = 'Cabin-(gocabin)_2_3600.html'
+ else:
+ page = 'hidden/cabin_driveway.html'
+ elif 'news' in command or 'headlines' in command:
+ page = 'cnn-CNNNews_4_25900.html'
+ elif 'clock' in command or 'time' in command:
+ page = 'clock_10_none.html'
+ elif 'countdown' in command or 'countdowns' in command:
+ page = 'countdown_3_7200.html'
+ elif 'costco' in command:
+ page = 'Costco-(gocostco)_2_3600.html'
+ elif 'calendar' in command or 'events' in command:
+ page = 'gcal_3_86400.html'
+ elif 'countdown' in command or 'countdowns' in command:
+ page = 'countdown_3_7200.html'
+ elif 'grocery' in command or 'groceries' in command:
+ page = 'Grocery-(gogrocery)_2_3600.html'
+ elif 'hardware' in command:
+ page = 'Hardware-(gohardware)_2_3600.html'
+ elif 'garage' in command:
+ page = 'myq_4_300.html'
+ elif 'menu' in command:
+ page = 'Menu-(gomenu)_2_3600.html'
+ elif 'cron' in command or 'health' in command:
+ page = 'periodic-health_6_300.html'
+ elif 'photo' in command or 'picture' in command:
+ page = 'photo_23_3600.html'
+ elif 'quote' in command or 'quotation' in command or 'quotes' in command:
+ page = 'quotes_4_10800.html'
+ elif 'stevens' in command:
+ page = 'stevens-conditions_1_86400.html'
+ elif 'stock' in command or 'stocks' in command:
+ page = 'stock_3_86400.html'
+ elif 'twitter' in command:
+ page = 'twitter_10_3600.html'
+ elif 'traffic' in command:
+ page = 'wsdot-bridges_3_none.html'
+ elif 'front' in command and 'door' in command:
+ page = 'hidden/frontdoor.html'
+ elif 'driveway' in command:
+ page = 'hidden/driveway.html'
+ elif 'backyard' in command:
+ page = 'hidden/backyard.html'
+ else:
+ page = guess_page(command, page_chooser)
+ assert page is not None
+ return page
+
+
+def thread_change_current(command_queue: Queue) -> None:
+ page_history = [ "", "" ]
+ swap_page_target = 0.0
+
+ def filter_news_during_dinnertime(page: str) -> bool:
+ now = datetime.now()
+ is_dinnertime = now.hour >= 17 and now.hour <= 20
+ print(f"is dinnertime = {is_dinnertime}")
+ print(f'page = {page}')
+ return not is_dinnertime or not (
+ "cnn" in page
+ or "news" in page
+ or "mynorthwest" in page
+ or "seattle" in page
+ or "stranger" in page
+ or "twitter" in page
+ or "wsj" in page
+ )
+ page_chooser = chooser.weighted_random_chooser_with_triggers(
+ trigger_catalog.get_triggers(), [filter_news_during_dinnertime]
+ )
+
+ while True:
+ now = time.time()
+
+ # Check for a verbal command.
+ command = None
+ try:
+ command = command_queue.get(block=False)
+ except Exception:
+ command = None
+ pass
+ if command is not None:
+ triggered = True
+ page = process_command(command, page_history)
+
+ # Else pick a page randomly.
+ else:
+ while True:
+ (page, triggered) = page_chooser.choose_next_page()
+ if triggered or page != page_history[0]:
+ break
+
+ if triggered:
+ print("chooser[%s] - WE ARE TRIGGERED." % utils.timestamp())
+ if page != page_history[0] or (swap_page_target - now < 10.0):
+ print(
+ "chooser[%s] - EMERGENCY PAGE %s LOAD NEEDED"
+ % (utils.timestamp(), page)
+ )
+ try:
+ with open(os.path.join(constants.pages_dir, "current.shtml"), "w") as f:
+ emit_wrapped(f, page, override_refresh_sec = 40, command = command)
+ page_history.insert(0, page)
+ page_history = page_history[0:10]
+ swap_page_target = now + 40
+ except:
+ print("chooser[%s] - page does not exist?!" % (utils.timestamp()))
+ continue
+
+ # Also notify XMLHTTP clients that they need to refresh now.
+ path = os.path.join(constants.pages_dir, "reload_immediately.html")
+ with open(path, "w") as f:
+ f.write("Reload, suckers!")
+
+ # Fix this hack... maybe read the webserver logs and see if it
+ # actually was picked up?
+ time.sleep(0.75)
+ os.remove(path)
+
+ elif now >= swap_page_target:
+ assert page != page_history[0]
+ print("chooser[%s] - nominal choice of %s" % (utils.timestamp(), page))
+ try:
+ with open(os.path.join(constants.pages_dir, "current.shtml"), "w") as f:
+ emit_wrapped(f, page)
+ page_history.insert(0, page)
+ page_history = page_history[0:10]
+ swap_page_target = now + constants.refresh_period_sec
+ except:
+ print("chooser[%s] - page does not exist?!" % (utils.timestamp()))
+ continue
+ time.sleep(1)
+
+
+def emit_wrapped(f,
+ filename: str,
+ *,
+ override_refresh_sec: int = None,
+ command: str = None) -> None:
+ def pick_background_color() -> str:
+ now = datetime.now(tz=pytz.timezone("US/Pacific"))
+ city = astral.LocationInfo(
+ "Bellevue", "USA", "US/Pacific", 47.610, -122.201
+ )
+ s = sun(city.observer, date=now, tzinfo=pytz.timezone("US/Pacific"))
+ sunrise_mod = utils.minute_number(s["sunrise"].hour, s["sunrise"].minute)
+ sunset_mod = utils.minute_number(s["sunset"].hour, s["sunset"].minute)
+ now_mod = utils.minute_number(now.hour, now.minute)
+ if now_mod < sunrise_mod or now_mod > (sunset_mod + 45):
+ return "E6B8B8"
+ elif now_mod < (sunrise_mod + 45) or now_mod > (sunset_mod + 120):
+ return "EECDCD"
+ else:
+ return "FFFFFF"
+
+ def get_refresh_period() -> float:
+ if override_refresh_sec is not None:
+ return float(override_refresh_sec * 1000.0)
+ now = datetime.now()
+ if now.hour < 7:
+ return float(constants.refresh_period_night_sec * 1000.0)
+ else:
+ return float(constants.refresh_period_sec * 1000.0)
+
+ age = utils.describe_age_of_file_briefly(f"pages/{filename}")
+ bgcolor = pick_background_color()
+ if command is None:
+ pageid = filename
+ else:
+ pageid = f'"{command}" -> {filename}'
+
+ f.write(
+ """
+<HEAD>
+ <TITLE>Kitchen Kiosk</TITLE>
+ <LINK rel="stylesheet" type="text/css" href="style.css">
+ <SCRIPT TYPE="text/javascript">
+
+ // Zoom the 'contents' div to fit without scrollbars and then make
+ // it visible.
+ function zoomScreen() {
+ z = 285;
+ do {
+ document.getElementById("content").style.zoom = z+"%%";
+ var body = document.body;
+ var html = document.documentElement;
+ var height = Math.max(body.scrollHeight,
+ body.offsetHeight,
+ html.clientHeight,
+ html.scrollHeight,
+ html.offsetHeight);
+ var windowHeight = window.innerHeight;
+ var width = Math.max(body.scrollWidth,
+ body.offsetWidth,
+ html.clientWidth,
+ html.scrollWidth,
+ html.offsetWidth);
+ var windowWidth = window.innerWidth;
+ var heightRatio = height / windowHeight;
+ var widthRatio = width / windowWidth;
+
+ if (heightRatio <= 1.0 && widthRatio <= 1.0) {
+ break;
+ }
+ z -= 4;
+ } while(z >= 70);
+ document.getElementById("content").style.visibility = "visible";
+ }
+
+ // Load IMG tags with DATA-SRC attributes late.
+ function lateLoadImages() {
+ var image = document.getElementsByTagName('img');
+ for (var i = 0; i < image.length; i++) {
+ if (image[i].getAttribute('DATA-SRC')) {
+ image[i].setAttribute('SRC', image[i].getAttribute('DATA-SRC'));
+ }
+ }
+ }
+
+ // Operate the clock at the top of the page.
+ function runClock() {
+ var today = new Date();
+ var h = today.getHours();
+ var ampm = h >= 12 ? 'pm' : 'am';
+ h = h %% 12;
+ h = h ? h : 12; // the hour '0' should be '12'
+ var m = maybeAddZero(today.getMinutes());
+ var colon = ":";
+ if (today.getSeconds() %% 2 == 0) {
+ colon = "<FONT STYLE='color: #%s; font-size: 4vmin; font-weight: bold'>:</FONT>";
+ }
+ document.getElementById("time").innerHTML = h + colon + m + ampm;
+ document.getElementById("date").innerHTML = today.toDateString();
+ var t = setTimeout(function(){runClock()}, 1000);
+ }
+
+ // Helper method for running the clock.
+ function maybeAddZero(x) {
+ return (x < 10) ? "0" + x : x;
+ }
+
+ // Do something on page load.
+ function addLoadEvent(func) {
+ var oldonload = window.onload;
+ if (typeof window.onload != 'function') {
+ window.onload = func;
+ } else {
+ window.onload = function() {
+ if (oldonload) {
+ oldonload();
+ }
+ func();
+ }
+ }
+ }
+
+ // Sleep thread helper.
+ const sleep = (milliseconds) => {
+ return new Promise(resolve => setTimeout(resolve, milliseconds))
+ }
+
+ var loadedDate = new Date();
+
+ addLoadEvent(zoomScreen);
+ addLoadEvent(runClock);
+ addLoadEvent(lateLoadImages);
+
+ // Runs the countdown line at the bottom and is responsible for
+ // normal page reloads caused by the expiration of a timer.
+ (function countdown() {
+ setTimeout(
+ function() {
+ var now = new Date();
+ var deltaMs = now.getTime() - loadedDate.getTime();
+ var totalMs = %d;
+ var remainingMs = (totalMs - deltaMs);
+
+ if (remainingMs > 0) {
+ var hr = document.getElementById("countdown");
+ var width = (remainingMs / (totalMs - 5000)) * 100.0;
+ if (width <= 100) {
+ hr.style.visibility = "visible";
+ hr.style.width = " ".concat(width, "%%");
+ hr.style.backgroundColor = "maroon";
+ }
+ } else {
+ // Reload unconditionally after 22 sec.
+ window.location.reload();
+ }
+
+ // Brief sleep before doing it all over again.
+ sleep(50).then(() => {
+ countdown();
+ });
+ }, 50)
+ })();
+
+ // Periodically checks for emergency reload events.
+ (function poll() {
+ setTimeout(
+ function() {
+ var xhr = new XMLHttpRequest();
+ xhr.open('GET',
+ 'http://%s/kiosk/pages/reload_immediately.html');
+ xhr.onload =
+ function() {
+ if (xhr.status === 200) {
+ window.location.reload();
+ } else {
+ sleep(500).then(() => {
+ poll();
+ });
+ }
+ };
+ xhr.send();
+ }, 500);
+ })();
+</SCRIPT>
+</HEAD>
+<BODY BGCOLOR="#%s">
+ <TABLE style="height:100%%; width:100%%" BORDER=0>
+ <TR HEIGHT=30>
+ <TD ALIGN="left">
+ <DIV id="date"> </DIV>
+ </TD>
+ <TD ALIGN="center"><FONT COLOR=#bbbbbb>
+ <DIV id="info"></DIV></FONT>
+ </TD>
+ <TD ALIGN="right">
+ <DIV id="time"> </DIV>
+ </TD>
+ </TR>
+ <TR STYLE="vertical-align:top">
+ <TD COLSPAN=3>
+ <DIV ID="content" STYLE="zoom: 1; visibility: hidden;">
+ <!-- BEGIN main page contents. -->
+<!--#include virtual=\"%s\"-->
+ <!-- END main page contents. -->
+ </DIV>
+ <BR>
+ <DIV STYLE="position: absolute; top:1030px; width:99%%">
+ <P ALIGN="right">
+ <FONT SIZE=2 COLOR=#bbbbbb>%s @ %s ago.</FONT>
+ </P>
+ <HR id="countdown" STYLE="width:0px;
+ text-align:left;
+ margin:0;
+ border:none;
+ border-width:0;
+ height:5px;
+ visibility:hidden;
+ background-color:#ffffff;">
+ </DIV>
+ </TD>
+ </TR>
+ </TABLE>
+</BODY>"""
+ % (
+ bgcolor,
+ get_refresh_period(),
+ constants.hostname,
+ bgcolor,
+ filename,
+ pageid,
+ age,
+ )
+ )
+
+
+def thread_invoke_renderers() -> None:
+ while True:
+ print(f"renderer[{utils.timestamp()}]: invoking all renderers in catalog...")
+ for r in renderer_catalog.get_renderers():
+ now = time.time()
+ try:
+ r.render()
+ except Exception as e:
+ traceback.print_exc(file=sys.stdout)
+ print(
+ f"renderer[{utils.timestamp()}] unknown exception ({e}) in {r.get_name()}, swallowing it."
+ )
+ delta = time.time() - now
+ if delta > 1.0:
+ print(
+ f"renderer[{utils.timestamp()}]: Warning: {r.get_name()}'s rendering took {delta:5.2f}s."
+ )
+ print(
+ f"renderer[{utils.timestamp()}]: thread having a little break for {constants.render_period_sec}s..."
+ )
+ time.sleep(constants.render_period_sec)
+
+
+if __name__ == "__main__":
+ logging.basicConfig()
+ command_queue: Queue = Queue()
+ changer_thread: Optional[Thread] = None
+ renderer_thread: Optional[Thread] = None
+ janitor_thread: Optional[Thread] = None
+ hotword_thread: Optional[Thread] = None
+ while True:
+ if hotword_thread is None or not hotword_thread.is_alive():
+ keyword_paths = [pvporcupine.KEYWORD_PATHS[x] for x in ["bumblebee"]]
+ sensitivities = [0.7] * len(keyword_paths)
+ listener = listen.HotwordListener(
+ command_queue,
+ keyword_paths,
+ sensitivities,
+ )
+ hotword_thread = Thread(target=listener.listen_forever, args=())
+ hotword_thread.start()
+ if changer_thread is None or not changer_thread.is_alive():
+ print(
+ f"MAIN[{utils.timestamp()}] - (Re?)initializing chooser thread... (wtf?!)"
+ )
+ changer_thread = Thread(target=thread_change_current, args=(command_queue,))
+ changer_thread.start()
+ if renderer_thread is None or not renderer_thread.is_alive():
+ print(
+ f"MAIN[{utils.timestamp()}] - (Re?)initializing render thread... (wtf?!)"
+ )
+ renderer_thread = Thread(target=thread_invoke_renderers, args=())
+ renderer_thread.start()
+ if janitor_thread is None or not janitor_thread.is_alive():
+ print(
+ f"MAIN[{utils.timestamp()}] - (Re?)initializing janitor thread... (wtf?!)"
+ )
+ janitor_thread = Thread(target=thread_janitor, args=())
+ janitor_thread.start()
+ time.sleep(60)
import constants
import file_writer
import renderer
-import secrets
+import kiosk_secrets as secrets
import utils
def get_state_icon(self, state: str) -> str:
if state == "open":
- return "/kiosk/pages/images/garage_open.png"
+ return "/kiosk/images/garage_open.png"
elif state == "closed":
- return "/kiosk/pages/images/garage_closed.png"
+ return "/kiosk/images/garage_closed.png"
elif state == "opening":
- return "/kiosk/pages/images/garage_opening.png"
+ return "/kiosk/images/garage_opening.png"
elif state == "closing":
- return "/kiosk/pages/images/garage_closing.png"
+ return "/kiosk/images/garage_closing.png"
else:
return str(state) + ", an unknown state for the door."
--- /dev/null
+#!/usr/bin/env python3
+
+import string
+import re
+
+
+class profanity_filter:
+ def __init__(self):
+ self.arrBad = [
+ "acrotomophilia",
+ "anal",
+ "anally",
+ "anilingus",
+ "anus",
+ "arsehole",
+ "ass",
+ "asses",
+ "asshole",
+ "assmunch",
+ "auto erotic",
+ "autoerotic",
+ "babeland",
+ "baby batter",
+ "ball gag",
+ "ball gravy",
+ "ball kicking",
+ "ball licking",
+ "ball sack",
+ "ball zack",
+ "ball sucking",
+ "bangbros",
+ "bareback",
+ "barely legal",
+ "barenaked",
+ "bastardo",
+ "bastinado",
+ "bbw",
+ "bdsm",
+ "beaver cleaver",
+ "beaver lips",
+ "bestiality",
+ "bi curious",
+ "big black",
+ "big breasts",
+ "big knockers",
+ "big tits",
+ "bimbos",
+ "birdlock",
+ "bitch",
+ "bitches",
+ "black cock",
+ "blonde action",
+ "blonde on blonde",
+ "blow j",
+ "blow your l",
+ "blow ourselves",
+ "blow m",
+ "blue waffle",
+ "blumpkin",
+ "bollocks",
+ "bondage",
+ "boner",
+ "boob",
+ "boobs",
+ "booty call",
+ "breasts",
+ "brown showers",
+ "brunette action",
+ "bukkake",
+ "bulldyke",
+ "bullshit",
+ "bullet vibe",
+ "bung hole",
+ "bunghole",
+ "busty",
+ "butt",
+ "buttcheeks",
+ "butthole",
+ "camel toe",
+ "camgirl",
+ "camslut",
+ "camwhore",
+ "carpet muncher",
+ "carpetmuncher",
+ "chocolate rosebuds",
+ "circlejerk",
+ "cleveland steamer",
+ "clit",
+ "clitoris",
+ "clover clamps",
+ "clusterfuck",
+ "cock",
+ "cocks",
+ "coprolagnia",
+ "coprophilia",
+ "cornhole",
+ "creampie",
+ "cream pie",
+ "cum",
+ "cumming",
+ "cunnilingus",
+ "cunt",
+ "damn",
+ "darkie",
+ "date rape",
+ "daterape",
+ "deep throat",
+ "deepthroat",
+ "dick",
+ "dildo",
+ "dirty pillows",
+ "dirty sanchez",
+ "dog style",
+ "doggie style",
+ "doggiestyle",
+ "doggy style",
+ "doggystyle",
+ "dolcett",
+ "domination",
+ "dominatrix",
+ "dommes",
+ "donkey punch",
+ "double dick",
+ "double dong",
+ "double penetration",
+ "dp action",
+ "dtf",
+ "eat my ass",
+ "ecchi",
+ "ejaculation",
+ "erection",
+ "erotic",
+ "erotism",
+ "escort",
+ "ethical slut",
+ "eunuch",
+ "faggot",
+ "posts each week",
+ "fecal",
+ "felch",
+ "fellatio",
+ "feltch",
+ "female squirting",
+ "femdom",
+ "figging",
+ "fingering",
+ "fisting",
+ "foot fetish",
+ "footjob",
+ "frotting",
+ "fuck",
+ "fucking",
+ "fuckin",
+ "fuckin'",
+ "fucked",
+ "fuckers",
+ "fuck buttons",
+ "fuckhead",
+ "fudge packer",
+ "fudgepacker",
+ "futanari",
+ "g-spot",
+ "gspot",
+ "gang bang",
+ "gay sex",
+ "genitals",
+ "giant cock",
+ "girl on",
+ "girl on top",
+ "girls gone wild",
+ "goatcx",
+ "goatse",
+ "goddamn",
+ "gokkun",
+ "golden shower",
+ "goo girl",
+ "goodpoop",
+ "goregasm",
+ "grope",
+ "group sex",
+ "guro",
+ "hand job",
+ "handjob",
+ "hard core",
+ "hardcore",
+ "hentai",
+ "homoerotic",
+ "honkey",
+ "hooker",
+ "horny",
+ "hot chick",
+ "how to kill",
+ "how to murder",
+ "huge fat",
+ "humping",
+ "incest",
+ "intercourse",
+ "jack off",
+ "jail bait",
+ "jailbait",
+ "jerk off",
+ "jerking off",
+ "jigaboo",
+ "jiggaboo",
+ "jiggerboo",
+ "jizz",
+ "juggs",
+ "kike",
+ "kinbaku",
+ "kinkster",
+ "kinky",
+ "knobbing",
+ "leather restraint",
+ "lemon party",
+ "lolita",
+ "lovemaking",
+ "lpt request",
+ "make me come",
+ "male squirting",
+ "masturbate",
+ "masturbated",
+ "masturbating",
+ "menage a trois",
+ "milf",
+ "milfs",
+ "missionary position",
+ "motherfucker",
+ "mound of venus",
+ "mr hands",
+ "muff diver",
+ "muffdiving",
+ "nambla",
+ "nawashi",
+ "negro",
+ "neonazi",
+ "nig nog",
+ "nigga",
+ "nigger",
+ "nimphomania",
+ "nipple",
+ "not safe for",
+ "nsfw",
+ "nsfw images",
+ "nude",
+ "nudity",
+ "nutsack",
+ "nut sack",
+ "nympho",
+ "nymphomania",
+ "octopussy",
+ "omorashi",
+ "one night stand",
+ "onlyfans",
+ "orgasm",
+ "orgy",
+ "paedophile",
+ "panties",
+ "panty",
+ "pedobear",
+ "pedophile",
+ "pegging",
+ "pee",
+ "penis",
+ "phone sex",
+ "piss pig",
+ "pissing",
+ "pisspig",
+ "playboy",
+ "pleasure chest",
+ "pole smoker",
+ "ponyplay",
+ "poof",
+ "poop chute",
+ "poopchute",
+ "porn",
+ "pornhub",
+ "porno",
+ "pornography",
+ "prince albert",
+ "pthc",
+ "pube",
+ "pubes",
+ "pussy",
+ "pussies",
+ "queaf",
+ "queer",
+ "raghead",
+ "raging boner",
+ "rape",
+ "raping",
+ "rapist",
+ "rectum",
+ "reverse cowgirl",
+ "rimjob",
+ "rimming",
+ "rosy palm",
+ "rusty trombone",
+ "s&m",
+ "sadism",
+ "scat",
+ "schlong",
+ "scissoring",
+ "semen",
+ "sex",
+ "sexo",
+ "sexy",
+ "shaved beaver",
+ "shaved pussy",
+ "shemale",
+ "shibari",
+ "shit",
+ "shota",
+ "shrimping",
+ "slanteye",
+ "slut",
+ "smut",
+ "snatch",
+ "snowballing",
+ "sodomize",
+ "sodomy",
+ "spic",
+ "spooge",
+ "spread legs",
+ "strap on",
+ "strapon",
+ "strappado",
+ "strip club",
+ "style doggy",
+ "suck",
+ "sucks",
+ "suicide girls",
+ "sultry women",
+ "swastika",
+ "swinger",
+ "tainted love",
+ "taste my",
+ "tea bagging",
+ "threesome",
+ "throating",
+ "tied up",
+ "tight white",
+ "tit",
+ "tits",
+ "titties",
+ "titty",
+ "tongue in a",
+ "topless",
+ "tosser",
+ "towelhead",
+ "tranny",
+ "tribadism",
+ "tub girl",
+ "tubgirl",
+ "tushy",
+ "twat",
+ "twink",
+ "twinkie",
+ "undressing",
+ "upskirt",
+ "urethra play",
+ "urophilia",
+ "vagina",
+ "venus mound",
+ "vibrator",
+ "violet blue",
+ "violet wand",
+ "vorarephilia",
+ "voyeur",
+ "vulva",
+ "wank",
+ "wet dream",
+ "wetback",
+ "white power",
+ "whore",
+ "women rapping",
+ "wrapping men",
+ "wrinkled starfish",
+ "xx",
+ "xxx",
+ "yaoi",
+ "yellow showers",
+ "yiffy",
+ "zoophilia",
+ ]
+
+ def normalize(self, text: str) -> str:
+ result = text.lower()
+ result = result.replace("_", " ")
+ for x in string.punctuation:
+ result = result.replace(x, "")
+ result = re.sub(r"e?s$", "", result)
+ return result
+
+ def filter_bad_words(self, text: str) -> str:
+ badWordMask = "!@#$%!@#$%^~!@%^~@#$%!@#$%^~!"
+
+ brokenStr1 = text.split()
+ for word in brokenStr1:
+ if self.normalize(word) in self.arrBad or word in self.arrBad:
+ print(f'***** PROFANITY WORD="{word}"')
+ text = text.replace(word, badWordMask[: len(word)])
+
+ if len(brokenStr1) > 1:
+ bigrams = list(zip(brokenStr1, brokenStr1[1:]))
+ for bigram in bigrams:
+ phrase = f"{bigram[0]} {bigram[1]}"
+ if self.normalize(phrase) in self.arrBad or phrase in self.arrBad:
+ print(f'***** PROFANITY PHRASE="{phrase}"')
+ text = text.replace(bigram[0], badWordMask[: len(bigram[0])])
+ text = text.replace(bigram[1], badWordMask[: len(bigram[1])])
+
+ if len(brokenStr1) > 2:
+ trigrams = list(zip(brokenStr1, brokenStr1[1:], brokenStr1[2:]))
+ for trigram in trigrams:
+ phrase = f"{trigram[0]} {trigram[1]} {trigram[2]}"
+ if self.normalize(phrase) in self.arrBad or phrase in self.arrBad:
+ print(f'***** PROFANITY PHRASE="{phrase}"')
+ text = text.replace(trigram[0], badWordMask[: len(trigram[0])])
+ text = text.replace(trigram[1], badWordMask[: len(trigram[1])])
+ text = text.replace(trigram[2], badWordMask[: len(trigram[2])])
+ return text
+
+ def contains_bad_words(self, text: str) -> bool:
+ brokenStr1 = text.split()
+ for word in brokenStr1:
+ if self.normalize(word) in self.arrBad or word in self.arrBad:
+ print(f'***** PROFANITY WORD="{word}"')
+ return True
+
+ if len(brokenStr1) > 1:
+ bigrams = list(zip(brokenStr1, brokenStr1[1:]))
+ for bigram in bigrams:
+ phrase = f"{bigram[0]} {bigram[1]}"
+ if self.normalize(phrase) in self.arrBad or phrase in self.arrBad:
+ print(f'***** PROFANITY PHRASE="{phrase}"')
+ return True
+
+ if len(brokenStr1) > 2:
+ trigrams = list(zip(brokenStr1, brokenStr1[1:], brokenStr1[2:]))
+ for trigram in trigrams:
+ phrase = f"{trigram[0]} {trigram[1]} {trigram[2]}"
+ if self.normalize(phrase) in self.arrBad or phrase in self.arrBad:
+ print(f'***** PROFANITY PHRASE="{phrase}"')
+ return True
+ return False
+
+
+# x = profanity_filter()
+# print(x.filter_bad_words("Fuck this auto erotic shit, it's not safe for work."))
+# print(x.contains_bad_words("cream pie their daughter."))
+# print(x.contains_bad_words("If you tell someone your penis is 6 inches it's pretty believable. If you say it's half a foot no one will believe you."))
+# print(x.normalize("dickes"));
import page_builder
import profanity_filter
import renderer
-import secrets
+import kiosk_secrets as secrets
class reddit_renderer(renderer.debuggable_abstaining_renderer):
self.font_size = font_size
self.messages = grab_bag.grab_bag()
self.filters: List[Callable[..., bool]] = [
- profanity_filter.profanity_filter().contains_bad_words
+ profanity_filter.ProfanityFilter().contains_bad_word
]
self.filters.extend(additional_filters)
self.deduper: Set[str] = set()
class til_reddit_renderer(reddit_renderer):
def __init__(self, name_to_timeout_dict: Dict[str, int]):
super(til_reddit_renderer, self).__init__(
- name_to_timeout_dict, ["todayilearned"], min_votes=200, font_size=20
+ name_to_timeout_dict, ["todayilearned"], min_votes=100, font_size=20
)
class quotes_reddit_renderer(reddit_renderer):
def __init__(self, name_to_timeout_dict: Dict[str, int]):
super(quotes_reddit_renderer, self).__init__(
- name_to_timeout_dict, ["quotes"], min_votes=200, font_size=20
+ name_to_timeout_dict, ["quotes"], min_votes=100, font_size=20
)
class showerthoughts_reddit_renderer(reddit_renderer):
@staticmethod
def dont_tell_me_about_gift_cards(msg: str) -> bool:
- return not "IMPORTANT PSA: No, you did not win a gift card" in msg
+ return "gift card" in msg
def __init__(self, name_to_timeout_dict: Dict[str, int]):
super(showerthoughts_reddit_renderer, self).__init__(
name_to_timeout_dict,
["showerthoughts"],
- min_votes=250,
+ min_votes=150,
additional_filters=[
showerthoughts_reddit_renderer.dont_tell_me_about_gift_cards
],
class lifeprotips_reddit_renderer(reddit_renderer):
def __init__(self, name_to_timeout_dict: Dict[str, int]):
super(lifeprotips_reddit_renderer, self).__init__(
- name_to_timeout_dict, ["lifeprotips"], min_votes=100
+ name_to_timeout_dict, ["lifeprotips"], min_votes=50
)
#!/usr/bin/env python3
+import bellevue_city_calendar_renderer
import bellevue_reporter_rss_renderer
import constants
import cnn_rss_renderer
import reddit_renderer
import renderer
import seattletimes_rss_renderer
-import secrets
+import kiosk_secrets as secrets
import stevens_renderer
import stranger_renderer
import stock_renderer
import twitter_renderer
+import urbanist_renderer
import weather_renderer
import wsj_rss_renderer
always = seconds * 1
-oauth = gdata_oauth.OAuth(secrets.google_client_id, secrets.google_client_secret)
-if not oauth.has_token():
- user_code = oauth.get_user_code()
- print("------------------------------------------------------------")
- print(
- (
- 'Go to %s and enter the code "%s" (no quotes, case-sensitive)'
- % (oauth.verification_url, user_code)
- )
- )
- oauth.get_new_token()
+oauth = gdata_oauth.OAuth(secrets.google_client_secret)
# Note, the 1s updates don't really update every second; there's a max
myq_renderer.garage_door_renderer(
{"Poll MyQ": (minutes * 5), "Update Page": (always)}
),
+ bellevue_city_calendar_renderer.bellevue_city_calendar_renderer(
+ {
+ "Fetch News": (hours * 1),
+ "Shuffle News": (always),
+ },
+ 'bellevuewa.gov',
+ [ '/calendar/events.xml' ],
+ 'Bellevue City Calendar'
+ ),
bellevue_reporter_rss_renderer.bellevue_reporter_rss_renderer(
{"Fetch News": (hours * 1), "Shuffle News": (always)},
"www.bellevuereporter.com",
["/feed/"],
"Bellevue Reporter",
),
+ urbanist_renderer.urbanist_renderer(
+ {'Fetch News': (hours * 2), 'Shuffle News': (always)},
+ 'www.theurbanist.org',
+ ['/feed/'],
+ 'TheUrbanist',
+ ),
mynorthwest_rss_renderer.mynorthwest_rss_renderer(
{"Fetch News": (hours * 1), "Shuffle News": (always)},
"mynorthwest.com",
"Seattle Times Segments",
),
weather_renderer.weather_renderer(
- {"Fetch Weather (Bellevue)": (hours * 6)}, "home"
+ {"Fetch Weather (Bellevue)": (hours * 3)}, "home"
),
weather_renderer.weather_renderer(
- {"Fetch Weather (Stevens)": (hours * 6)}, "stevens"
+ {"Fetch Weather (Stevens)": (hours * 3)}, "stevens"
),
weather_renderer.weather_renderer(
- {"Fetch Weather (Telma)": (hours * 6)}, "telma"),
+ {"Fetch Weather (Telma)": (hours * 3)}, "telma"),
local_photos_mirror_renderer.local_photos_mirror_renderer(
{"Index Photos": (hours * 24), "Choose Photo": (always)}
),
def get_ticker_name(ticker: yf.ticker.Ticker) -> str:
"""Get friendly name of a ticker."""
info = ticker.get_info()
- return info["shortName"]
+ if "shortName" in info:
+ return info["shortName"]
+ return ticker
@staticmethod
def get_price(ticker: yf.ticker.Ticker) -> Optional[float]:
symbols_finished = 0
for symbol in self.symbols:
ticker = yf.Ticker(symbol)
- print(type(ticker))
# print(ticker.get_info())
if ticker is None:
self.debug_print(f"Unknown symbol {symbol} -- ignored.")
def fetch_events(self) -> bool:
self.events.clear()
feed_uris = [
- "/stranger-seattle/events/?page=1",
- "/stranger-seattle/events/?page=2",
- "/stranger-seattle/events/?page=3",
+ "/seattle/events/?page=1",
+ "/seattle/events/?page=2",
+ "/seattle/events/?page=3",
]
now = datetime.datetime.now()
ts = now + datetime.timedelta(1)
tomorrow = datetime.datetime.strftime(ts, "%Y-%m-%d")
- feed_uris.append(f"/stranger-seattle/events/?start-date={tomorrow}")
+ feed_uris.append(f"/seattle/events/?start-date={tomorrow}")
delta = 5 - now.weekday()
if delta <= 0:
delta += 7
if delta > 1:
ts = now + datetime.timedelta(delta)
next_sat = datetime.datetime.strftime(ts, "%Y-%m-%d")
- feed_uris.append(f"/stranger-seattle/events/?start-date={next_sat}&page=1")
- feed_uris.append(f"/stranger-seattle/events/?start-date={next_sat}&page=2")
+ feed_uris.append(f"/seattle/events/?start-date={next_sat}&page=1")
+ feed_uris.append(f"/seattle/events/?start-date={next_sat}&page=2")
delta += 1
if delta > 1:
ts = now + datetime.timedelta(delta)
next_sun = datetime.datetime.strftime(ts, "%Y-%m-%d")
- feed_uris.append(f"/stranger-seattle/events/?start-date={next_sun}&page=1")
- feed_uris.append(f"/stranger-seattle/events/?start-date={next_sun}&page=2")
+ feed_uris.append(f"/seattle/events/?start-date={next_sun}&page=1")
+ feed_uris.append(f"/seattle/events/?start-date={next_sun}&page=2")
- filter = profanity_filter.profanity_filter()
+ filter = profanity_filter.ProfanityFilter()
for uri in feed_uris:
try:
self.debug_print("fetching 'https://%s%s'" % (self.feed_site, uri))
soup = BeautifulSoup(raw, "html.parser")
for x in soup.find_all("div", class_="row event list-item mb-3 py-3"):
text = x.get_text()
- if filter.contains_bad_words(text):
+ if filter.contains_bad_word(text):
continue
raw_str = str(x)
raw_str = raw_str.replace(
# Test
-# x = stranger_events_renderer({"Test", 123})
-# x.periodic_render("Fetch Events")
-# x.periodic_render("Shuffle Events")
+#x = stranger_events_renderer({"Test", 123})
+#x.periodic_render("Fetch Events")
+#x.periodic_render("Shuffle Events")
--- /dev/null
+#!/usr/bin/env python3
+
+import os
+import struct
+from datetime import datetime
+from threading import Thread
+
+import numpy as np
+import pvporcupine
+import pyaudio
+import soundfile
+import speech_recognition as sr
+
+recognizer = sr.Recognizer()
+raw = audio_stream.read(porcupine.frame_length, exception_on_overflow=False)
+pcm = struct.unpack_from("h" * porcupine.frame_length, raw)
+result = porcupine.process(pcm)
+
+speech = sr.AudioData(
+ frame_data = bytes(raw),
+ sample_rate = porcupine.sample_rate,
+ sample_width = 2, # 16 bits
+)
+command = recognizer.recognize_google(speech)
+print('[%s] >>>>>>>>>>>>> Google says command was %s' % (
+ str(datetime.now()), command)
+)
import file_writer
import renderer
import profanity_filter
-import secrets
+import kiosk_secrets as secrets
class twitter_renderer(renderer.debuggable_abstaining_renderer):
self.debug = True
self.tweets_by_author: Dict[str, List[tweepy.models.Status]] = {}
self.handles_by_author: Dict[str, str] = {}
- self.filter = profanity_filter.profanity_filter()
+ self.filter = profanity_filter.ProfanityFilter()
self.urlfinder = re.compile(
"((http|https)://[\-A-Za-z0-9\\.]+/[\?\&\-A-Za-z0-9_\\.]+)"
)
for tweet in tweets:
text = tweet.full_text
if (text not in already_seen) and (
- not self.filter.contains_bad_words(text)
+ not self.filter.contains_bad_word(text)
):
already_seen.add(text)
text = self.linkify(text)
--- /dev/null
+#!/usr/bin/env python3
+
+import datetime
+import re
+from typing import Dict, List, Optional
+import xml
+
+from dateutil.parser import parse
+
+import generic_news_rss_renderer as gnrss
+
+
+# https://www.theurbanist.org/feed/
+class urbanist_renderer(gnrss.generic_news_rss_renderer):
+ """Read the TheUrbanist feed."""
+
+ def __init__(
+ self,
+ name_to_timeout_dict: Dict[str, int],
+ feed_site: str,
+ feed_uris: List[str],
+ page_title: str,
+ ):
+ super(urbanist_renderer, self).__init__(
+ name_to_timeout_dict, feed_site, feed_uris, page_title
+ )
+ self.debug = True
+
+ def debug_prefix(self) -> str:
+ return f"urbanist({self.page_title})"
+
+ def get_headlines_page_prefix(self) -> str:
+ return "urbanist"
+
+ def get_details_page_prefix(self) -> str:
+ return "urbanist-details"
+
+ def should_use_https(self) -> bool:
+ return True
+
+ def get_event_time(self, item: xml.etree.ElementTree.Element) -> Optional[datetime.datetime]:
+ return parse(self.find_pubdate(item))
+
+ def item_is_interesting_for_headlines(
+ self, title: str, description: str, item: xml.etree.ElementTree.Element
+ ) -> bool:
+ return description.lower() != 'the urbanist'
+
+ def do_details(self) -> bool:
+ return True
+
+ def item_is_interesting_for_article(
+ self, title: str, description: str, item: xml.etree.ElementTree.Element
+ ) -> bool:
+ return len(description) > 20
+
+
+# Test
+#x = urbanist_renderer(
+# {"Fetch News" : 1,
+# "Shuffle News" : 1},
+# "www.theurbanist.org",
+# [ "/feed/" ],
+# "Test" )
+#if x.fetch_news() == 0:
+# print("Error fetching news, no items fetched.")
+#else:
+# x.shuffle_news()
import file_writer
import renderer
-import secrets
+import kiosk_secrets as secrets
import random
# Icon
f.write(
- ' <tr><td colspan=3 height=100><center><img src="/kiosk/pages/images/weather/%s" height=125></center></td></tr>\n'
+ ' <tr><td colspan=3 height=100><center><img src="/kiosk/images/weather/%s" height=125></center></td></tr>\n'
% self.pick_icon(conditions[date], rain[date], snow[date])
)