X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=gcal_renderer.py;h=c6daf3f746c83d1aa892aa15151c13b07476ebab;hb=09215cf1a1498c99ee75a7cbef3ea62f58a56f0d;hp=37f8c8e50671b32e2c432ead74221c01359fe58f;hpb=c06bfef53f70551e7920bc4facce27f47b89e2ba;p=kiosk.git diff --git a/gcal_renderer.py b/gcal_renderer.py index 37f8c8e..c6daf3f 100644 --- a/gcal_renderer.py +++ b/gcal_renderer.py @@ -4,21 +4,25 @@ contents of several Google calendars.""" import datetime -import gdata -import gdata_oauth -from oauth2client.client import AccessTokenRefreshError -import os +import functools +import logging import time -from typing import Dict, List, Tuple +from typing import Any, Dict, List, Optional, Tuple + +from dateutil.parser import parse +import gdata_oauth +import pytz -import constants +import kiosk_constants import file_writer import globals import renderer -import secrets -class gcal_renderer(renderer.debuggable_abstaining_renderer): +logger = logging.getLogger(__name__) + + +class gcal_renderer(renderer.abstaining_renderer): """A renderer to fetch upcoming events from www.google.com/calendar""" calendar_whitelist = frozenset( @@ -34,18 +38,23 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): ] ) + @functools.total_ordering class comparable_event(object): """A helper class to sort events.""" def __init__( self, - start_time: datetime.datetime, - end_time: datetime.datetime, + start_time: Optional[datetime.datetime], + end_time: Optional[datetime.datetime], summary: str, calendar: str, ) -> None: if start_time is None: assert end_time is None + else: + assert isinstance(start_time, datetime.datetime) + if end_time is not None: + assert isinstance(end_time, datetime.datetime) self.start_time = start_time self.end_time = end_time self.summary = summary @@ -63,7 +72,15 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): that.calendar, ) - def __str__(self) -> str: + def __eq__(self, that) -> bool: + return ( + self.start_time == that.start_time + and self.end_time == that.end_time + and self.summary == that.summary + and self.calendar == that.calendar + ) + + def __repr__(self) -> str: return "[%s] %s" % (self.timestamp(), self.friendly_name()) def friendly_name(self) -> str: @@ -72,65 +89,79 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): return "%s" % name def timestamp(self) -> str: + now = datetime.datetime.now(pytz.timezone("US/Pacific")) if self.start_time is None: return "None" - elif self.start_time.hour == 0: - return datetime.datetime.strftime(self.start_time, "%a %b %d %Y") + elif ( + self.start_time.hour == 0 + and self.start_time.minute == 0 + and self.start_time.second == 0 + ): + if self.start_time.year == now.year: + return datetime.datetime.strftime(self.start_time, "%a %b %d") + else: + return datetime.datetime.strftime(self.start_time, "%a %b %d, %Y") else: - return datetime.datetime.strftime( - self.start_time, "%a %b %d %Y %H:%M%p" + dt = self.start_time + zone = dt.tzinfo + local_dt = dt.replace(tzinfo=zone).astimezone( + tz=pytz.timezone("US/Pacific") ) + if local_dt.year == now.year: + return datetime.datetime.strftime(local_dt, "%a %b %d %I:%M%p") + else: + return datetime.datetime.strftime(local_dt, "%a %b %d, %Y %I:%M%p") def __init__( self, name_to_timeout_dict: Dict[str, int], oauth: gdata_oauth.OAuth ) -> None: - super(gcal_renderer, self).__init__(name_to_timeout_dict, True) + super().__init__(name_to_timeout_dict) self.oauth = oauth self.client = self.oauth.calendar_service() - self.sortable_events = [] - self.countdown_events = [] + self.sortable_events: List[gcal_renderer.comparable_event] = [] + self.countdown_events: List[gcal_renderer.comparable_event] = [] def debug_prefix(self) -> str: return "gcal" def periodic_render(self, key: str) -> bool: - self.debug_print('called for "%s"' % key) + logger.debug(f'called for "{key}"') if key == "Render Upcoming Events": return self.render_upcoming_events() elif key == "Look For Triggered Events": return self.look_for_triggered_events() else: - raise error("Unexpected operation") + raise Exception("Unexpected operation") def get_min_max_timewindow(self) -> Tuple[str, str]: - now = datetime.datetime.now() - time_min = now - datetime.timedelta(1) - time_max = now + datetime.timedelta(95) - time_min, time_max = list( - map( - lambda x: datetime.datetime.strftime(x, "%Y-%m-%dT%H:%M:%SZ"), - (time_min, time_max), - ) - ) - print(type(time_min)) - self.debug_print("time_min is %s" % time_min) - self.debug_print("time_max is %s" % time_max) + now = datetime.datetime.now(pytz.timezone("US/Pacific")) + _time_min = now - datetime.timedelta(hours=6) + _time_max = now + datetime.timedelta(days=95) + time_min = datetime.datetime.strftime(_time_min, "%Y-%m-%dT%H:%M:%SZ") + time_max = datetime.datetime.strftime(_time_max, "%Y-%m-%dT%H:%M:%SZ") + logger.debug(f"time_min is {time_min}") + logger.debug(f"time_max is {time_max}") return (time_min, time_max) @staticmethod - def parse_date(date_str: str) -> datetime.datetime: - retval = None - try: - _ = date_str.get("date") - if _: - retval = datetime.datetime.strptime(_, "%Y-%m-%d") - else: - _ = date_str.get("dateTime") - if _: - retval = datetime.datetime.strptime(_[:-6], "%Y-%m-%dT%H:%M:%S") - return retval - except: - pass + def parse_date(date: Any) -> Optional[datetime.datetime]: + if isinstance(date, datetime.datetime): + return date + elif isinstance(date, dict): + if "dateTime" in date: + d = date["dateTime"] + dt = parse(d) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=None).astimezone( + tz=pytz.timezone("US/Pacific") + ) + return dt + elif "date" in date: + d = date["date"] + dt = datetime.datetime.strptime(d, "%Y-%m-%d") + dt = dt.replace(tzinfo=None).astimezone(tz=pytz.timezone("US/Pacific")) + return dt + print(f"Not sure what to do with this {date} ({type(date)}), help?!") return None def get_events_from_interesting_calendars( @@ -145,9 +176,7 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): ) for calendar in calendar_list["items"]: if calendar["summary"] in gcal_renderer.calendar_whitelist: - self.debug_print( - f"{calendar['summary']} is an interesting calendar..." - ) + logger.debug(f"{calendar['summary']} is an interesting calendar...") events = ( self.client.events() .list( @@ -161,12 +190,13 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): ) for event in events["items"]: summary = event["summary"] - self.debug_print( - f" ... event '{summary}' ({event['start']} to {event['end']}" - ) start = gcal_renderer.parse_date(event["start"]) end = gcal_renderer.parse_date(event["end"]) + logger.debug( + f" ... event '{summary}' ({event['start']} ({start}) to {event['end']} ({end})" + ) if start is not None and end is not None: + logger.debug(f" ... adding {summary} to sortable_events") sortable_events.append( gcal_renderer.comparable_event( start, end, summary, calendar["summary"] @@ -177,7 +207,9 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): or "Holidays" in calendar["summary"] or "Countdown" in summary ): - self.debug_print(" ... event is countdown worthy!") + logger.debug( + f" ... adding {summary} to countdown_events" + ) countdown_events.append( gcal_renderer.comparable_event( start, end, summary, calendar["summary"] @@ -192,49 +224,65 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): (time_min, time_max) = self.get_min_max_timewindow() try: # Populate the "Upcoming Events" page. + logger.debug("Rendering the Upcoming Events page...") ( self.sortable_events, self.countdown_events, ) = self.get_events_from_interesting_calendars(time_min, time_max) self.sortable_events.sort() with file_writer.file_writer("gcal_3_86400.html") as f: - f.write("
- {event.timestamp()} - | -- {event.friendly_name()} - | -
+ {event.timestamp()} + | ++ {event.friendly_name()} + | +