#!/usr/bin/env python3 """Renders an upcoming events page and countdowns page based on the contents of several Google calendars.""" import datetime import gdata # type: ignore import gdata_oauth from oauth2client.client import AccessTokenRefreshError # type: ignore import os import time from typing import Dict, List, Optional, Tuple import constants import file_writer import globals import renderer import secrets class gcal_renderer(renderer.debuggable_abstaining_renderer): """A renderer to fetch upcoming events from www.google.com/calendar""" calendar_whitelist = frozenset( [ "Alex's calendar", "Family", "Holidays in United States", "Lynn Gasch", "Lynn's Work", "scott.gasch@gmail.com", "Scott Gasch External - Misc", "Birthdays", # <-- from g+ contacts ] ) class comparable_event(object): """A helper class to sort events.""" def __init__( self, start_time: Optional[datetime.datetime], end_time: Optional[datetime.datetime], summary: str, calendar: str, ) -> None: if start_time is None: assert(end_time is None) self.start_time = start_time self.end_time = end_time self.summary = summary self.calendar = calendar def __lt__(self, that) -> bool: if self.start_time is None and that.start_time is None: return self.summary < that.summary if self.start_time is None or that.start_time is None: return self.start_time is None return (self.start_time, self.end_time, self.summary, self.calendar) < ( that.start_time, that.end_time, that.summary, that.calendar, ) def __str__(self) -> str: return "[%s] %s" % (self.timestamp(), self.friendly_name()) def friendly_name(self) -> str: name = self.summary name = name.replace("countdown:", "") return "%s" % name def timestamp(self) -> str: if self.start_time is None: return "None" elif self.start_time.hour == 0: return datetime.datetime.strftime(self.start_time, "%a %b %d %Y") else: return datetime.datetime.strftime( self.start_time, "%a %b %d %Y %H:%M%p" ) def __init__( self, name_to_timeout_dict: Dict[str, int], oauth: gdata_oauth.OAuth ) -> None: super(gcal_renderer, self).__init__(name_to_timeout_dict, True) self.oauth = oauth self.client = self.oauth.calendar_service() self.sortable_events: List[gcal_renderer.comparable_event] = [] self.countdown_events: List[gcal_renderer.comparable_event] = [] def debug_prefix(self) -> str: return "gcal" def periodic_render(self, key: str) -> bool: self.debug_print('called for "%s"' % key) if key == "Render Upcoming Events": return self.render_upcoming_events() elif key == "Look For Triggered Events": return self.look_for_triggered_events() else: raise Exception("Unexpected operation") def get_min_max_timewindow(self) -> Tuple[str, str]: now = datetime.datetime.now() _time_min = now - datetime.timedelta(1) _time_max = now + datetime.timedelta(95) time_min = datetime.datetime.strftime(_time_min, "%Y-%m-%dT%H:%M:%SZ") time_max = datetime.datetime.strftime(_time_max, "%Y-%m-%dT%H:%M:%SZ") self.debug_print(f"time_min is {time_min}") self.debug_print(f"time_max is {time_max}") return (time_min, time_max) @staticmethod def parse_date(date_str: str) -> Optional[datetime.datetime]: retval = None try: _ = date_str.get("date") if _: retval = datetime.datetime.strptime(_, "%Y-%m-%d") else: _ = date_str.get("dateTime") if _: retval = datetime.datetime.strptime(_[:-6], "%Y-%m-%dT%H:%M:%S") return retval except: pass return None def get_events_from_interesting_calendars( self, time_min: str, time_max: str ) -> Tuple[List[comparable_event], List[comparable_event]]: page_token = None sortable_events = [] countdown_events = [] while True: calendar_list = ( self.client.calendarList().list(pageToken=page_token).execute() ) for calendar in calendar_list["items"]: if calendar["summary"] in gcal_renderer.calendar_whitelist: self.debug_print( f"{calendar['summary']} is an interesting calendar..." ) events = ( self.client.events() .list( calendarId=calendar["id"], singleEvents=True, timeMin=time_min, timeMax=time_max, maxResults=50, ) .execute() ) for event in events["items"]: summary = event["summary"] self.debug_print( f" ... event '{summary}' ({event['start']} to {event['end']}" ) start = gcal_renderer.parse_date(event["start"]) end = gcal_renderer.parse_date(event["end"]) if start is not None and end is not None: sortable_events.append( gcal_renderer.comparable_event( start, end, summary, calendar["summary"] ) ) if ( "countdown" in summary or "Holidays" in calendar["summary"] or "Countdown" in summary ): self.debug_print(" ... event is countdown worthy!") countdown_events.append( gcal_renderer.comparable_event( start, end, summary, calendar["summary"] ) ) page_token = calendar_list.get("nextPageToken") if not page_token: break return (sortable_events, countdown_events) def render_upcoming_events(self) -> bool: (time_min, time_max) = self.get_min_max_timewindow() try: # Populate the "Upcoming Events" page. ( self.sortable_events, self.countdown_events, ) = self.get_events_from_interesting_calendars(time_min, time_max) self.sortable_events.sort() with file_writer.file_writer("gcal_3_86400.html") as f: f.write("

Upcoming Calendar Events:


\n") f.write("
\n") upcoming_sortable_events = self.sortable_events[:12] for event in upcoming_sortable_events: f.write( f""" \n""" ) f.write("
{event.timestamp()} {event.friendly_name()}
\n") # Populate the "Countdown" page. self.countdown_events.sort() with file_writer.file_writer("countdown_3_7200.html") as g: g.write("

Countdowns:


") g.write("""" ) return True except (gdata.service.RequestError, AccessTokenRefreshError): print("********* TRYING TO REFRESH GCAL CLIENT *********") self.oauth.refresh_token() self.client = self.oauth.calendar_service() return False except: raise def look_for_triggered_events(self) -> bool: with file_writer.file_writer(constants.gcal_imminent_pagename) as f: f.write("

Imminent Upcoming Calendar Events:

\n
\n") f.write("
\n") now = datetime.datetime.now() count = 0 for event in self.sortable_events: eventstamp = event.start_time if eventstamp is None: return False delta = eventstamp - now x = int(delta.total_seconds()) if x > 0 and x <= constants.seconds_per_minute * 3: days = divmod(x, constants.seconds_per_day) hours = divmod(days[1], constants.seconds_per_hour) minutes = divmod(hours[1], constants.seconds_per_minute) eventstamp = event.start_time name = event.friendly_name() calendar = event.calendar f.write( f"
  • {name} ({calendar}) upcoming in {int(minutes[0])} minutes.\n" ) count += 1 f.write("
  • ") if count > 0: globals.put("gcal_triggered", True) else: globals.put("gcal_triggered", False) return True # Test # oauth = gdata_oauth.OAuth(secrets.google_client_id, secrets.google_client_secret) # x = gcal_renderer( # {"Render Upcoming Events": 10000, "Look For Triggered Events": 1}, # oauth) # x.periodic_render("Render Upcoming Events")