#!/usr/bin/env python3 """Renders an upcoming events page and countdowns page based on the contents of several Google calendars.""" import datetime import functools import logging import time from typing import Any, Dict, List, Optional, Tuple from dateutil.parser import parse import gdata_oauth import pytz import kiosk_constants import file_writer import globals import renderer logger = logging.getLogger(__name__) class gcal_renderer(renderer.abstaining_renderer): """A renderer to fetch upcoming events from www.google.com/calendar""" calendar_whitelist = frozenset( [ "Alex's calendar", "Family", "Holidays in United States", "Lynn Gasch", "Lynn's Work", "scott.gasch@gmail.com", "Scott Gasch External - Misc", "Birthdays", # <-- from g+ contacts ] ) @functools.total_ordering class comparable_event(object): """A helper class to sort events.""" def __init__( self, start_time: Optional[datetime.datetime], end_time: Optional[datetime.datetime], summary: str, calendar: str, ) -> None: if start_time is None: assert end_time is None else: assert isinstance(start_time, datetime.datetime) if end_time is not None: assert isinstance(end_time, datetime.datetime) self.start_time = start_time self.end_time = end_time self.summary = summary self.calendar = calendar def __lt__(self, that) -> bool: if self.start_time is None and that.start_time is None: return self.summary < that.summary if self.start_time is None or that.start_time is None: return self.start_time is None return (self.start_time, self.end_time, self.summary, self.calendar) < ( that.start_time, that.end_time, that.summary, that.calendar, ) def __eq__(self, that) -> bool: return ( self.start_time == that.start_time and self.end_time == that.end_time and self.summary == that.summary and self.calendar == that.calendar ) def __repr__(self) -> str: return "[%s] %s" % (self.timestamp(), self.friendly_name()) def friendly_name(self) -> str: name = self.summary name = name.replace("countdown:", "") return "%s" % name def timestamp(self) -> str: now = datetime.datetime.now(pytz.timezone("US/Pacific")) if self.start_time is None: return "None" elif ( self.start_time.hour == 0 and self.start_time.minute == 0 and self.start_time.second == 0 ): if self.start_time.year == now.year: return datetime.datetime.strftime(self.start_time, "%a %b %d") else: return datetime.datetime.strftime(self.start_time, "%a %b %d, %Y") else: dt = self.start_time zone = dt.tzinfo local_dt = dt.replace(tzinfo=zone).astimezone( tz=pytz.timezone("US/Pacific") ) if local_dt.year == now.year: return datetime.datetime.strftime(local_dt, "%a %b %d %I:%M%p") else: return datetime.datetime.strftime(local_dt, "%a %b %d, %Y %I:%M%p") def __init__( self, name_to_timeout_dict: Dict[str, int], oauth: gdata_oauth.OAuth ) -> None: super().__init__(name_to_timeout_dict) self.oauth = oauth self.client = self.oauth.calendar_service() self.sortable_events: List[gcal_renderer.comparable_event] = [] self.countdown_events: List[gcal_renderer.comparable_event] = [] def debug_prefix(self) -> str: return "gcal" def periodic_render(self, key: str) -> bool: logger.debug(f'called for "{key}"') if key == "Render Upcoming Events": return self.render_upcoming_events() elif key == "Look For Triggered Events": return self.look_for_triggered_events() else: raise Exception("Unexpected operation") def get_min_max_timewindow(self) -> Tuple[str, str]: now = datetime.datetime.now(pytz.timezone("US/Pacific")) _time_min = now - datetime.timedelta(hours=6) _time_max = now + datetime.timedelta(days=95) time_min = datetime.datetime.strftime(_time_min, "%Y-%m-%dT%H:%M:%SZ") time_max = datetime.datetime.strftime(_time_max, "%Y-%m-%dT%H:%M:%SZ") logger.debug(f"time_min is {time_min}") logger.debug(f"time_max is {time_max}") return (time_min, time_max) @staticmethod def parse_date(date: Any) -> Optional[datetime.datetime]: if isinstance(date, datetime.datetime): return date elif isinstance(date, dict): if "dateTime" in date: d = date["dateTime"] dt = parse(d) if dt.tzinfo is None: dt = dt.replace(tzinfo=None).astimezone( tz=pytz.timezone("US/Pacific") ) return dt elif "date" in date: d = date["date"] dt = datetime.datetime.strptime(d, "%Y-%m-%d") dt = dt.replace(tzinfo=None).astimezone(tz=pytz.timezone("US/Pacific")) return dt print(f"Not sure what to do with this {date} ({type(date)}), help?!") return None def get_events_from_interesting_calendars( self, time_min: str, time_max: str ) -> Tuple[List[comparable_event], List[comparable_event]]: page_token = None sortable_events = [] countdown_events = [] while True: calendar_list = ( self.client.calendarList().list(pageToken=page_token).execute() ) for calendar in calendar_list["items"]: if calendar["summary"] in gcal_renderer.calendar_whitelist: logger.debug(f"******* {calendar['summary']} is an interesting calendar...") events = ( self.client.events() .list( calendarId=calendar["id"], singleEvents=True, timeMin=time_min, timeMax=time_max, maxResults=500, ) .execute() ) for event in events["items"]: summary = event["summary"] start = gcal_renderer.parse_date(event["start"]) end = gcal_renderer.parse_date(event["end"]) logger.debug( f" ... event '{summary}' ({event['start']} ({start}) to {event['end']} ({end})" ) if start is not None and end is not None: logger.debug(f" ... adding {summary} to sortable_events") sortable_events.append( gcal_renderer.comparable_event( start, end, summary, calendar["summary"] ) ) if ( "countdown" in summary or "Holidays" in calendar["summary"] or "Countdown" in summary ): logger.debug( f" ... adding {summary} to countdown_events" ) countdown_events.append( gcal_renderer.comparable_event( start, end, summary, calendar["summary"] ) ) page_token = calendar_list.get("nextPageToken") if not page_token: break return (sortable_events, countdown_events) def render_upcoming_events(self) -> bool: (time_min, time_max) = self.get_min_max_timewindow() try: # Populate the "Upcoming Events" page. logger.debug("Rendering the Upcoming Events page...") ( self.sortable_events, self.countdown_events, ) = self.get_events_from_interesting_calendars(time_min, time_max) self.sortable_events.sort() with file_writer.file_writer("gcal_3_86400.html") as f: f.write( """

Upcoming Calendar Events:


""" ) upcoming_sortable_events = self.sortable_events[:12] for n, event in enumerate(upcoming_sortable_events): logger.debug(f"{n}/12: {event.friendly_name()} / {event.calendar}") if n % 2 == 0: color = "#c6b0b0" else: color = "#eeeeee" f.write( f""" """ ) f.write("
{event.timestamp()} {event.friendly_name()}
\n") # Populate the "Countdown" page. logger.debug("Rendering the Countdowns page") self.countdown_events.sort() with file_writer.file_writer("countdown_3_7200.html") as g: g.write("

Countdowns:


") g.write("""" ) return True except Exception as e: logger.exception(e) return False def look_for_triggered_events(self) -> bool: logger.debug("Looking for Imminent Upcoming Calendar events...") with file_writer.file_writer(kiosk_constants.gcal_imminent_pagename) as f: f.write("

Imminent Upcoming Calendar Events:

\n
\n") f.write("
\n") now = datetime.datetime.now(pytz.timezone("US/Pacific")) count = 0 for event in self.sortable_events: eventstamp = event.start_time if eventstamp is None: continue delta = eventstamp - now x = int(delta.total_seconds()) if x > 0 and x < 4 * kiosk_constants.seconds_per_minute: days = divmod(x, kiosk_constants.seconds_per_day) hours = divmod(days[1], kiosk_constants.seconds_per_hour) minutes = divmod(hours[1], kiosk_constants.seconds_per_minute) eventstamp = event.start_time name = event.friendly_name() calendar = event.calendar logger.debug(f"Event {event} ({name}) triggered the page.") f.write( f"
  • {name} ({calendar}) upcoming in {int(minutes[0])} minutes.\n" ) count += 1 f.write("
  • ") if count > 0: globals.put("gcal_triggered", True) else: globals.put("gcal_triggered", False) return True # Test #import kiosk_secrets as secrets #import sys #logger.setLevel(logging.DEBUG) #logger.addHandler(logging.StreamHandler(sys.stdout)) #oauth = gdata_oauth.OAuth(secrets.google_client_secret) #x = gcal_renderer( # {"Render Upcoming Events": 10000, "Look For Triggered Events": 1}, oauth #) #x.periodic_render("Render Upcoming Events")