X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=gcal_renderer.py;h=c6daf3f746c83d1aa892aa15151c13b07476ebab;hb=09215cf1a1498c99ee75a7cbef3ea62f58a56f0d;hp=37f8c8e50671b32e2c432ead74221c01359fe58f;hpb=c06bfef53f70551e7920bc4facce27f47b89e2ba;p=kiosk.git diff --git a/gcal_renderer.py b/gcal_renderer.py index 37f8c8e..c6daf3f 100644 --- a/gcal_renderer.py +++ b/gcal_renderer.py @@ -4,21 +4,25 @@ contents of several Google calendars.""" import datetime -import gdata -import gdata_oauth -from oauth2client.client import AccessTokenRefreshError -import os +import functools +import logging import time -from typing import Dict, List, Tuple +from typing import Any, Dict, List, Optional, Tuple + +from dateutil.parser import parse +import gdata_oauth +import pytz -import constants +import kiosk_constants import file_writer import globals import renderer -import secrets -class gcal_renderer(renderer.debuggable_abstaining_renderer): +logger = logging.getLogger(__name__) + + +class gcal_renderer(renderer.abstaining_renderer): """A renderer to fetch upcoming events from www.google.com/calendar""" calendar_whitelist = frozenset( @@ -34,18 +38,23 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): ] ) + @functools.total_ordering class comparable_event(object): """A helper class to sort events.""" def __init__( self, - start_time: datetime.datetime, - end_time: datetime.datetime, + start_time: Optional[datetime.datetime], + end_time: Optional[datetime.datetime], summary: str, calendar: str, ) -> None: if start_time is None: assert end_time is None + else: + assert isinstance(start_time, datetime.datetime) + if end_time is not None: + assert isinstance(end_time, datetime.datetime) self.start_time = start_time self.end_time = end_time self.summary = summary @@ -63,7 +72,15 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): that.calendar, ) - def __str__(self) -> str: + def __eq__(self, that) -> bool: + return ( + self.start_time == that.start_time + and self.end_time == that.end_time + and self.summary == that.summary + and self.calendar == that.calendar + ) + + def __repr__(self) -> str: return "[%s] %s" % (self.timestamp(), self.friendly_name()) def friendly_name(self) -> str: @@ -72,65 +89,79 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): return "%s" % name def timestamp(self) -> str: + now = datetime.datetime.now(pytz.timezone("US/Pacific")) if self.start_time is None: return "None" - elif self.start_time.hour == 0: - return datetime.datetime.strftime(self.start_time, "%a %b %d %Y") + elif ( + self.start_time.hour == 0 + and self.start_time.minute == 0 + and self.start_time.second == 0 + ): + if self.start_time.year == now.year: + return datetime.datetime.strftime(self.start_time, "%a %b %d") + else: + return datetime.datetime.strftime(self.start_time, "%a %b %d, %Y") else: - return datetime.datetime.strftime( - self.start_time, "%a %b %d %Y %H:%M%p" + dt = self.start_time + zone = dt.tzinfo + local_dt = dt.replace(tzinfo=zone).astimezone( + tz=pytz.timezone("US/Pacific") ) + if local_dt.year == now.year: + return datetime.datetime.strftime(local_dt, "%a %b %d %I:%M%p") + else: + return datetime.datetime.strftime(local_dt, "%a %b %d, %Y %I:%M%p") def __init__( self, name_to_timeout_dict: Dict[str, int], oauth: gdata_oauth.OAuth ) -> None: - super(gcal_renderer, self).__init__(name_to_timeout_dict, True) + super().__init__(name_to_timeout_dict) self.oauth = oauth self.client = self.oauth.calendar_service() - self.sortable_events = [] - self.countdown_events = [] + self.sortable_events: List[gcal_renderer.comparable_event] = [] + self.countdown_events: List[gcal_renderer.comparable_event] = [] def debug_prefix(self) -> str: return "gcal" def periodic_render(self, key: str) -> bool: - self.debug_print('called for "%s"' % key) + logger.debug(f'called for "{key}"') if key == "Render Upcoming Events": return self.render_upcoming_events() elif key == "Look For Triggered Events": return self.look_for_triggered_events() else: - raise error("Unexpected operation") + raise Exception("Unexpected operation") def get_min_max_timewindow(self) -> Tuple[str, str]: - now = datetime.datetime.now() - time_min = now - datetime.timedelta(1) - time_max = now + datetime.timedelta(95) - time_min, time_max = list( - map( - lambda x: datetime.datetime.strftime(x, "%Y-%m-%dT%H:%M:%SZ"), - (time_min, time_max), - ) - ) - print(type(time_min)) - self.debug_print("time_min is %s" % time_min) - self.debug_print("time_max is %s" % time_max) + now = datetime.datetime.now(pytz.timezone("US/Pacific")) + _time_min = now - datetime.timedelta(hours=6) + _time_max = now + datetime.timedelta(days=95) + time_min = datetime.datetime.strftime(_time_min, "%Y-%m-%dT%H:%M:%SZ") + time_max = datetime.datetime.strftime(_time_max, "%Y-%m-%dT%H:%M:%SZ") + logger.debug(f"time_min is {time_min}") + logger.debug(f"time_max is {time_max}") return (time_min, time_max) @staticmethod - def parse_date(date_str: str) -> datetime.datetime: - retval = None - try: - _ = date_str.get("date") - if _: - retval = datetime.datetime.strptime(_, "%Y-%m-%d") - else: - _ = date_str.get("dateTime") - if _: - retval = datetime.datetime.strptime(_[:-6], "%Y-%m-%dT%H:%M:%S") - return retval - except: - pass + def parse_date(date: Any) -> Optional[datetime.datetime]: + if isinstance(date, datetime.datetime): + return date + elif isinstance(date, dict): + if "dateTime" in date: + d = date["dateTime"] + dt = parse(d) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=None).astimezone( + tz=pytz.timezone("US/Pacific") + ) + return dt + elif "date" in date: + d = date["date"] + dt = datetime.datetime.strptime(d, "%Y-%m-%d") + dt = dt.replace(tzinfo=None).astimezone(tz=pytz.timezone("US/Pacific")) + return dt + print(f"Not sure what to do with this {date} ({type(date)}), help?!") return None def get_events_from_interesting_calendars( @@ -145,9 +176,7 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): ) for calendar in calendar_list["items"]: if calendar["summary"] in gcal_renderer.calendar_whitelist: - self.debug_print( - f"{calendar['summary']} is an interesting calendar..." - ) + logger.debug(f"{calendar['summary']} is an interesting calendar...") events = ( self.client.events() .list( @@ -161,12 +190,13 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): ) for event in events["items"]: summary = event["summary"] - self.debug_print( - f" ... event '{summary}' ({event['start']} to {event['end']}" - ) start = gcal_renderer.parse_date(event["start"]) end = gcal_renderer.parse_date(event["end"]) + logger.debug( + f" ... event '{summary}' ({event['start']} ({start}) to {event['end']} ({end})" + ) if start is not None and end is not None: + logger.debug(f" ... adding {summary} to sortable_events") sortable_events.append( gcal_renderer.comparable_event( start, end, summary, calendar["summary"] @@ -177,7 +207,9 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): or "Holidays" in calendar["summary"] or "Countdown" in summary ): - self.debug_print(" ... event is countdown worthy!") + logger.debug( + f" ... adding {summary} to countdown_events" + ) countdown_events.append( gcal_renderer.comparable_event( start, end, summary, calendar["summary"] @@ -192,49 +224,65 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): (time_min, time_max) = self.get_min_max_timewindow() try: # Populate the "Upcoming Events" page. + logger.debug("Rendering the Upcoming Events page...") ( self.sortable_events, self.countdown_events, ) = self.get_events_from_interesting_calendars(time_min, time_max) self.sortable_events.sort() with file_writer.file_writer("gcal_3_86400.html") as f: - f.write("

Upcoming Calendar Events:


\n") - f.write("
\n") + f.write( + """ +

Upcoming Calendar Events:

+
+
+
+""" + ) upcoming_sortable_events = self.sortable_events[:12] - for event in upcoming_sortable_events: + for n, event in enumerate(upcoming_sortable_events): + logger.debug(f"{n}/12: {event.friendly_name()} / {event.calendar}") + if n % 2 == 0: + color = "#c6b0b0" + else: + color = "#eeeeee" f.write( f""" - - - -\n""" + + + + +""" ) f.write("
- {event.timestamp()} - - {event.friendly_name()} -
+ {event.timestamp()} + + {event.friendly_name()} +
\n") # Populate the "Countdown" page. + logger.debug("Rendering the Countdowns page") self.countdown_events.sort() with file_writer.file_writer("countdown_3_7200.html") as g: g.write("

Countdowns:


") g.write("""" ) return True - except (gdata.service.RequestError, AccessTokenRefreshError): - print("********* TRYING TO REFRESH GCAL CLIENT *********") - self.oauth.refresh_token() - self.client = self.oauth.calendar_service() + except Exception as e: + logger.exception(e) return False - except: - raise def look_for_triggered_events(self) -> bool: - with file_writer.file_writer(constants.gcal_imminent_pagename) as f: + logger.debug("Looking for Imminent Upcoming Calendar events...") + with file_writer.file_writer(kiosk_constants.gcal_imminent_pagename) as f: f.write("

Imminent Upcoming Calendar Events:

\n
\n") f.write("
\n") - now = datetime.datetime.now() + now = datetime.datetime.now(pytz.timezone("US/Pacific")) count = 0 for event in self.sortable_events: eventstamp = event.start_time + if eventstamp is None: + continue delta = eventstamp - now x = int(delta.total_seconds()) - if x > 0 and x <= constants.seconds_per_minute * 3: - days = divmod(x, constants.seconds_per_day) - hours = divmod(days[1], constants.seconds_per_hour) - minutes = divmod(hours[1], constants.seconds_per_minute) + if x > 0 and x < 4 * kiosk_constants.seconds_per_minute: + days = divmod(x, kiosk_constants.seconds_per_day) + hours = divmod(days[1], kiosk_constants.seconds_per_hour) + minutes = divmod(hours[1], kiosk_constants.seconds_per_minute) eventstamp = event.start_time name = event.friendly_name() calendar = event.calendar + logger.debug(f"Event {event} ({name}) triggered the page.") f.write( f"
  • {name} ({calendar}) upcoming in {int(minutes[0])} minutes.\n" ) @@ -327,8 +375,10 @@ var fn = setInterval(function() { # Test -# oauth = gdata_oauth.OAuth(secrets.google_client_id, secrets.google_client_secret) +# import kiosk_secrets as secrets +# +# oauth = gdata_oauth.OAuth(secrets.google_client_secret) # x = gcal_renderer( -# {"Render Upcoming Events": 10000, "Look For Triggered Events": 1}, -# oauth) +# {"Render Upcoming Events": 10000, "Look For Triggered Events": 1}, oauth +# ) # x.periodic_render("Render Upcoming Events")