X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=gcal_renderer.py;h=19b818d8ad3309890eff6ef0d7f7b4b21ba63aed;hb=da3a11e9fcea80a7700eb54605512d331a9ec612;hp=37f8c8e50671b32e2c432ead74221c01359fe58f;hpb=c06bfef53f70551e7920bc4facce27f47b89e2ba;p=kiosk.git diff --git a/gcal_renderer.py b/gcal_renderer.py index 37f8c8e..19b818d 100644 --- a/gcal_renderer.py +++ b/gcal_renderer.py @@ -4,21 +4,25 @@ contents of several Google calendars.""" import datetime -import gdata -import gdata_oauth -from oauth2client.client import AccessTokenRefreshError -import os +import functools +import logging import time -from typing import Dict, List, Tuple +from typing import Any, Dict, List, Optional, Tuple + +from dateutil.parser import parse +import gdata_oauth +import pytz import constants import file_writer import globals import renderer -import secrets -class gcal_renderer(renderer.debuggable_abstaining_renderer): +logger = logging.getLogger(__file__) + + +class gcal_renderer(renderer.abstaining_renderer): """A renderer to fetch upcoming events from www.google.com/calendar""" calendar_whitelist = frozenset( @@ -34,18 +38,23 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): ] ) + @functools.total_ordering class comparable_event(object): """A helper class to sort events.""" def __init__( self, - start_time: datetime.datetime, - end_time: datetime.datetime, + start_time: Optional[datetime.datetime], + end_time: Optional[datetime.datetime], summary: str, calendar: str, ) -> None: if start_time is None: - assert end_time is None + assert(end_time is None) + else: + assert(isinstance(start_time, datetime.datetime)) + if end_time is not None: + assert(isinstance(end_time, datetime.datetime)) self.start_time = start_time self.end_time = end_time self.summary = summary @@ -63,7 +72,15 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): that.calendar, ) - def __str__(self) -> str: + def __eq__(self, that) -> bool: + return ( + self.start_time == that.start_time and + self.end_time == that.end_time and + self.summary == that.summary and + self.calendar == that.calendar + ) + + def __repr__(self) -> str: return "[%s] %s" % (self.timestamp(), self.friendly_name()) def friendly_name(self) -> str: @@ -72,65 +89,85 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): return "%s" % name def timestamp(self) -> str: + now = datetime.datetime.now(pytz.timezone("US/Pacific")) if self.start_time is None: return "None" - elif self.start_time.hour == 0: - return datetime.datetime.strftime(self.start_time, "%a %b %d %Y") + elif ( + self.start_time.hour == 0 and + self.start_time.minute == 0 and + self.start_time.second == 0 + ): + if self.start_time.year == now.year: + return datetime.datetime.strftime( + self.start_time, + "%a %b %d" + ) + else: + return datetime.datetime.strftime( + self.start_time, + "%a %b %d, %Y" + ) else: - return datetime.datetime.strftime( - self.start_time, "%a %b %d %Y %H:%M%p" - ) + dt = self.start_time + zone = dt.tzinfo + local_dt = dt.replace(tzinfo=zone).astimezone(tz=pytz.timezone('US/Pacific')) + if local_dt.year == now.year: + return datetime.datetime.strftime( + local_dt, "%a %b %d %I:%M%p" + ) + else: + return datetime.datetime.strftime( + local_dt, "%a %b %d, %Y %I:%M%p" + ) def __init__( self, name_to_timeout_dict: Dict[str, int], oauth: gdata_oauth.OAuth ) -> None: - super(gcal_renderer, self).__init__(name_to_timeout_dict, True) + super().__init__(name_to_timeout_dict) self.oauth = oauth self.client = self.oauth.calendar_service() - self.sortable_events = [] - self.countdown_events = [] + self.sortable_events: List[gcal_renderer.comparable_event] = [] + self.countdown_events: List[gcal_renderer.comparable_event] = [] def debug_prefix(self) -> str: return "gcal" def periodic_render(self, key: str) -> bool: - self.debug_print('called for "%s"' % key) + logger.debug('called for "%s"' % key) if key == "Render Upcoming Events": return self.render_upcoming_events() elif key == "Look For Triggered Events": return self.look_for_triggered_events() else: - raise error("Unexpected operation") + raise Exception("Unexpected operation") def get_min_max_timewindow(self) -> Tuple[str, str]: - now = datetime.datetime.now() - time_min = now - datetime.timedelta(1) - time_max = now + datetime.timedelta(95) - time_min, time_max = list( - map( - lambda x: datetime.datetime.strftime(x, "%Y-%m-%dT%H:%M:%SZ"), - (time_min, time_max), - ) - ) - print(type(time_min)) - self.debug_print("time_min is %s" % time_min) - self.debug_print("time_max is %s" % time_max) + now = datetime.datetime.now(pytz.timezone("US/Pacific")) + _time_min = now - datetime.timedelta(hours=6) + _time_max = now + datetime.timedelta(days=95) + time_min = datetime.datetime.strftime(_time_min, "%Y-%m-%dT%H:%M:%SZ") + time_max = datetime.datetime.strftime(_time_max, "%Y-%m-%dT%H:%M:%SZ") + logger.debug(f"time_min is {time_min}") + logger.debug(f"time_max is {time_max}") return (time_min, time_max) @staticmethod - def parse_date(date_str: str) -> datetime.datetime: - retval = None - try: - _ = date_str.get("date") - if _: - retval = datetime.datetime.strptime(_, "%Y-%m-%d") - else: - _ = date_str.get("dateTime") - if _: - retval = datetime.datetime.strptime(_[:-6], "%Y-%m-%dT%H:%M:%S") - return retval - except: - pass + def parse_date(date: Any) -> Optional[datetime.datetime]: + if isinstance(date, datetime.datetime): + return date + elif isinstance(date, dict): + if 'dateTime' in date: + d = date['dateTime'] + dt = parse(d) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=None).astimezone(tz=pytz.timezone('US/Pacific')) + return dt + elif 'date' in date: + d = date['date'] + dt = datetime.datetime.strptime(d, '%Y-%m-%d') + dt = dt.replace(tzinfo=None).astimezone(tz=pytz.timezone('US/Pacific')) + return dt + print(f'Not sure what to do with this {date} ({type(date)}), help?!') return None def get_events_from_interesting_calendars( @@ -145,7 +182,7 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): ) for calendar in calendar_list["items"]: if calendar["summary"] in gcal_renderer.calendar_whitelist: - self.debug_print( + logger.debug( f"{calendar['summary']} is an interesting calendar..." ) events = ( @@ -161,12 +198,13 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): ) for event in events["items"]: summary = event["summary"] - self.debug_print( - f" ... event '{summary}' ({event['start']} to {event['end']}" - ) start = gcal_renderer.parse_date(event["start"]) end = gcal_renderer.parse_date(event["end"]) + logger.debug( + f" ... event '{summary}' ({event['start']} ({start}) to {event['end']} ({end})" + ) if start is not None and end is not None: + logger.debug(f' ... adding {summary} to sortable_events') sortable_events.append( gcal_renderer.comparable_event( start, end, summary, calendar["summary"] @@ -177,7 +215,7 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): or "Holidays" in calendar["summary"] or "Countdown" in summary ): - self.debug_print(" ... event is countdown worthy!") + logger.debug(f" ... adding {summary} to countdown_events") countdown_events.append( gcal_renderer.comparable_event( start, end, summary, calendar["summary"] @@ -198,20 +236,32 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): ) = self.get_events_from_interesting_calendars(time_min, time_max) self.sortable_events.sort() with file_writer.file_writer("gcal_3_86400.html") as f: - f.write("

Upcoming Calendar Events:


\n") - f.write("
\n") + f.write( +f""" +

Upcoming Calendar Events:

+
+
+
+""" + ) upcoming_sortable_events = self.sortable_events[:12] - for event in upcoming_sortable_events: + for n, event in enumerate(upcoming_sortable_events): + logger.debug(f'{n}/12: {event.friendly_name()} / {event.calendar}') + if n % 2 == 0: + color = "#c6b0b0" + else: + color = "#eeeeee" f.write( - f""" - - - -\n""" +f""" + + + + +""" ) f.write("
- {event.timestamp()} - - {event.friendly_name()} -
+ {event.timestamp()} + + {event.friendly_name()} +
\n") @@ -219,12 +269,14 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer): self.countdown_events.sort() with file_writer.file_writer("countdown_3_7200.html") as g: g.write("

Countdowns:


") g.write("""" ) return True - except (gdata.service.RequestError, AccessTokenRefreshError): + except Exception as e: print("********* TRYING TO REFRESH GCAL CLIENT *********") - self.oauth.refresh_token() - self.client = self.oauth.calendar_service() +# self.oauth.refresh_token() +# self.client = self.oauth.calendar_service() return False except: raise @@ -301,10 +353,12 @@ var fn = setInterval(function() { with file_writer.file_writer(constants.gcal_imminent_pagename) as f: f.write("

Imminent Upcoming Calendar Events:

\n
\n") f.write("
\n") - now = datetime.datetime.now() + now = datetime.datetime.now(pytz.timezone("US/Pacific")) count = 0 for event in self.sortable_events: eventstamp = event.start_time + if eventstamp is None: + return False delta = eventstamp - now x = int(delta.total_seconds()) if x > 0 and x <= constants.seconds_per_minute * 3: @@ -327,8 +381,8 @@ var fn = setInterval(function() { # Test -# oauth = gdata_oauth.OAuth(secrets.google_client_id, secrets.google_client_secret) -# x = gcal_renderer( -# {"Render Upcoming Events": 10000, "Look For Triggered Events": 1}, -# oauth) -# x.periodic_render("Render Upcoming Events") +#oauth = gdata_oauth.OAuth(secrets.google_client_secret) +#x = gcal_renderer( +# {"Render Upcoming Events": 10000, "Look For Triggered Events": 1}, +# oauth) +#x.periodic_render("Render Upcoming Events")