-from oauth2client.client import AccessTokenRefreshError
-import constants
+#!/usr/bin/env python3
+
+"""Renders an upcoming events page and countdowns page based on the
+contents of several Google calendars."""
+
import datetime
+import gdata # type: ignore
+import gdata_oauth
+from oauth2client.client import AccessTokenRefreshError # type: ignore
+import os
+import time
+from typing import Dict, List, Optional, Tuple
+
+import constants
import file_writer
-import gdata
import globals
-import os
import renderer
-import time
+import secrets
class gcal_renderer(renderer.debuggable_abstaining_renderer):
class comparable_event(object):
"""A helper class to sort events."""
- def __init__(self, start_time, end_time, summary, calendar):
+ def __init__(
+ self,
+ start_time: Optional[datetime.datetime],
+ end_time: Optional[datetime.datetime],
+ summary: str,
+ calendar: str,
+ ) -> None:
if start_time is None:
- assert end_time is None
+ assert(end_time is None)
self.start_time = start_time
self.end_time = end_time
self.summary = summary
self.calendar = calendar
- def __lt__(self, that):
+ def __lt__(self, that) -> bool:
if self.start_time is None and that.start_time is None:
return self.summary < that.summary
if self.start_time is None or that.start_time is None:
that.calendar,
)
- def __str__(self):
+ def __str__(self) -> str:
return "[%s] %s" % (self.timestamp(), self.friendly_name())
- def friendly_name(self):
+ def friendly_name(self) -> str:
name = self.summary
name = name.replace("countdown:", "")
return "<B>%s</B>" % name
- def timestamp(self):
+ def timestamp(self) -> str:
if self.start_time is None:
return "None"
elif self.start_time.hour == 0:
self.start_time, "%a %b %d %Y %H:%M%p"
)
- def __init__(self, name_to_timeout_dict, oauth):
+ def __init__(
+ self, name_to_timeout_dict: Dict[str, int], oauth: gdata_oauth.OAuth
+ ) -> None:
super(gcal_renderer, self).__init__(name_to_timeout_dict, True)
self.oauth = oauth
self.client = self.oauth.calendar_service()
- self.sortable_events = []
- self.countdown_events = []
+ self.sortable_events: List[gcal_renderer.comparable_event] = []
+ self.countdown_events: List[gcal_renderer.comparable_event] = []
- def debug_prefix(self):
+ def debug_prefix(self) -> str:
return "gcal"
- def periodic_render(self, key):
+ def periodic_render(self, key: str) -> bool:
self.debug_print('called for "%s"' % key)
if key == "Render Upcoming Events":
return self.render_upcoming_events()
elif key == "Look For Triggered Events":
return self.look_for_triggered_events()
else:
- raise error("Unexpected operation")
-
- def render_upcoming_events(self):
- page_token = None
-
- def format_datetime(x):
- return datetime.datetime.strftime(x, "%Y-%m-%dT%H:%M:%SZ")
+ raise Exception("Unexpected operation")
+ def get_min_max_timewindow(self) -> Tuple[str, str]:
now = datetime.datetime.now()
- time_min = now - datetime.timedelta(1)
- time_max = now + datetime.timedelta(95)
- time_min, time_max = list(map(format_datetime, (time_min, time_max)))
- self.debug_print("time_min is %s" % time_min)
- self.debug_print("time_max is %s" % time_max)
-
- # Writes 2 files:
- # + "upcoming events",
- # + a countdown timer for a subser of events,
- f = file_writer.file_writer("gcal_3_86400.html")
- f.write("<h1>Upcoming Calendar Events:</h1><hr>\n")
- f.write("<center><table width=96%>\n")
-
- g = file_writer.file_writer("countdown_3_7200.html")
- g.write("<h1>Countdowns:</h1><hr><ul>\n")
+ _time_min = now - datetime.timedelta(1)
+ _time_max = now + datetime.timedelta(95)
+ time_min = datetime.datetime.strftime(_time_min, "%Y-%m-%dT%H:%M:%SZ")
+ time_max = datetime.datetime.strftime(_time_max, "%Y-%m-%dT%H:%M:%SZ")
+ self.debug_print(f"time_min is {time_min}")
+ self.debug_print(f"time_max is {time_max}")
+ return (time_min, time_max)
+ @staticmethod
+ def parse_date(date_str: str) -> Optional[datetime.datetime]:
+ retval = None
try:
- self.sortable_events = []
- self.countdown_events = []
- while True:
- calendar_list = (
- self.client.calendarList().list(pageToken=page_token).execute()
- )
- for calendar in calendar_list["items"]:
- if calendar["summary"] in gcal_renderer.calendar_whitelist:
- events = (
- self.client.events()
- .list(
- calendarId=calendar["id"],
- singleEvents=True,
- timeMin=time_min,
- timeMax=time_max,
- maxResults=50,
- )
- .execute()
- )
-
- def parse_date(x):
- y = x.get("date")
- if y:
- y = datetime.datetime.strptime(y, "%Y-%m-%d")
- else:
- y = x.get("dateTime")
- if y:
- y = datetime.datetime.strptime(
- y[:-6], "%Y-%m-%dT%H:%M:%S"
- )
- else:
- y = None
- return y
+ _ = date_str.get("date")
+ if _:
+ retval = datetime.datetime.strptime(_, "%Y-%m-%d")
+ else:
+ _ = date_str.get("dateTime")
+ if _:
+ retval = datetime.datetime.strptime(_[:-6], "%Y-%m-%dT%H:%M:%S")
+ return retval
+ except:
+ pass
+ return None
- for event in events["items"]:
- try:
- summary = event["summary"]
- self.debug_print(
- "event '%s' (%s to %s)"
- % (summary, event["start"], event["end"])
+ def get_events_from_interesting_calendars(
+ self, time_min: str, time_max: str
+ ) -> Tuple[List[comparable_event], List[comparable_event]]:
+ page_token = None
+ sortable_events = []
+ countdown_events = []
+ while True:
+ calendar_list = (
+ self.client.calendarList().list(pageToken=page_token).execute()
+ )
+ for calendar in calendar_list["items"]:
+ if calendar["summary"] in gcal_renderer.calendar_whitelist:
+ self.debug_print(
+ f"{calendar['summary']} is an interesting calendar..."
+ )
+ events = (
+ self.client.events()
+ .list(
+ calendarId=calendar["id"],
+ singleEvents=True,
+ timeMin=time_min,
+ timeMax=time_max,
+ maxResults=50,
+ )
+ .execute()
+ )
+ for event in events["items"]:
+ summary = event["summary"]
+ self.debug_print(
+ f" ... event '{summary}' ({event['start']} to {event['end']}"
+ )
+ start = gcal_renderer.parse_date(event["start"])
+ end = gcal_renderer.parse_date(event["end"])
+ if start is not None and end is not None:
+ sortable_events.append(
+ gcal_renderer.comparable_event(
+ start, end, summary, calendar["summary"]
)
- start = parse_date(event["start"])
- end = parse_date(event["end"])
- self.sortable_events.append(
+ )
+ if (
+ "countdown" in summary
+ or "Holidays" in calendar["summary"]
+ or "Countdown" in summary
+ ):
+ self.debug_print(" ... event is countdown worthy!")
+ countdown_events.append(
gcal_renderer.comparable_event(
start, end, summary, calendar["summary"]
)
)
- if (
- "countdown" in summary
- or "Holidays" in calendar["summary"]
- or "Countdown" in summary
- ):
- self.debug_print("event is countdown worthy")
- self.countdown_events.append(
- gcal_renderer.comparable_event(
- start, end, summary, calendar["summary"]
- )
- )
- except Exception as e:
- print("gcal unknown exception, skipping event.")
- else:
- self.debug_print("Skipping calendar '%s'" % calendar["summary"])
- page_token = calendar_list.get("nextPageToken")
- if not page_token:
- break
+ page_token = calendar_list.get("nextPageToken")
+ if not page_token:
+ break
+ return (sortable_events, countdown_events)
+ def render_upcoming_events(self) -> bool:
+ (time_min, time_max) = self.get_min_max_timewindow()
+ try:
+ # Populate the "Upcoming Events" page.
+ (
+ self.sortable_events,
+ self.countdown_events,
+ ) = self.get_events_from_interesting_calendars(time_min, time_max)
self.sortable_events.sort()
- upcoming_sortable_events = self.sortable_events[:12]
- for event in upcoming_sortable_events:
- self.debug_print("sorted event: %s" % event.friendly_name())
- f.write(
- """
+ with file_writer.file_writer("gcal_3_86400.html") as f:
+ f.write("<h1>Upcoming Calendar Events:</h1><hr>\n")
+ f.write("<center><table width=96%>\n")
+ upcoming_sortable_events = self.sortable_events[:12]
+ for event in upcoming_sortable_events:
+ f.write(
+ f"""
<tr>
<td style="padding-right: 1em;">
- %s
+ {event.timestamp()}
</td>
<td style="padding-left: 1em;">
- %s
+ {event.friendly_name()}
</td>
</tr>\n"""
- % (event.timestamp(), event.friendly_name())
- )
- f.write("</table></center>\n")
- f.close()
+ )
+ f.write("</table></center>\n")
+ # Populate the "Countdown" page.
self.countdown_events.sort()
- upcoming_countdown_events = self.countdown_events[:12]
- now = datetime.datetime.now()
- count = 0
- timestamps = {}
- for event in upcoming_countdown_events:
- eventstamp = event.start_time
- delta = eventstamp - now
- name = event.friendly_name()
- x = int(delta.total_seconds())
- if x > 0:
- identifier = "id%d" % count
- days = divmod(x, constants.seconds_per_day)
- hours = divmod(days[1], constants.seconds_per_hour)
- minutes = divmod(hours[1], constants.seconds_per_minute)
- g.write(
- '<li><SPAN id="%s">%d days, %02d:%02d</SPAN> until %s</li>\n'
- % (identifier, days[0], hours[0], minutes[0], name)
- )
- timestamps[identifier] = time.mktime(eventstamp.timetuple())
- count += 1
- self.debug_print(
- "countdown to %s is %dd %dh %dm"
- % (name, days[0], hours[0], minutes[0])
- )
- g.write("</ul>")
- g.write("<SCRIPT>\nlet timestampMap = new Map([")
- for x in list(timestamps.keys()):
- g.write(' ["%s", %f],\n' % (x, timestamps[x] * 1000.0))
- g.write("]);\n\n")
- g.write(
- """
+ with file_writer.file_writer("countdown_3_7200.html") as g:
+ g.write("<h1>Countdowns:</h1><hr><ul>\n")
+ now = datetime.datetime.now()
+ upcoming_countdown_events = self.countdown_events[:12]
+ count = 0
+ timestamps = {}
+ for event in upcoming_countdown_events:
+ eventstamp = event.start_time
+ if eventstamp is None:
+ return False
+ name = event.friendly_name()
+ delta = eventstamp - now
+ x = int(delta.total_seconds())
+ if x > 0:
+ identifier = "id%d" % count
+ days = divmod(x, constants.seconds_per_day)
+ hours = divmod(days[1], constants.seconds_per_hour)
+ minutes = divmod(hours[1], constants.seconds_per_minute)
+ g.write(
+ f'<li><SPAN id="%s">%d days, %02d:%02d</SPAN> until %s</li>\n'
+ % (
+ identifier,
+ int(days[0]),
+ int(hours[0]),
+ int(minutes[0]),
+ name,
+ )
+ )
+ timestamps[identifier] = time.mktime(eventstamp.timetuple())
+ count += 1
+ self.debug_print(
+ "countdown to %s is %dd %dh %dm"
+ % (name, days[0], hours[0], minutes[0])
+ )
+ g.write("</ul>")
+ g.write("<SCRIPT>\nlet timestampMap = new Map([")
+ for _ in list(timestamps.keys()):
+ g.write(f' ["{_}", {timestamps[_] * 1000.0}],\n')
+ g.write("]);\n\n")
+ g.write(
+ """
// Pad things with a leading zero if necessary.
function pad(n) {
return (n < 10) ? ("0" + n) : n;
}
}, 1000);
</script>"""
- )
- g.close()
+ )
return True
except (gdata.service.RequestError, AccessTokenRefreshError):
print("********* TRYING TO REFRESH GCAL CLIENT *********")
except:
raise
- def look_for_triggered_events(self):
- f = file_writer.file_writer(constants.gcal_imminent_pagename)
- f.write("<h1>Imminent Upcoming Calendar Events:</h1>\n<hr>\n")
- f.write("<center><table width=99%>\n")
- now = datetime.datetime.now()
- count = 0
- for event in self.sortable_events:
- eventstamp = event.start_time
- delta = eventstamp - now
- x = int(delta.total_seconds())
- if x > 0 and x <= constants.seconds_per_minute * 3:
- days = divmod(x, constants.seconds_per_day)
- hours = divmod(days[1], constants.seconds_per_hour)
- minutes = divmod(hours[1], constants.seconds_per_minute)
+ def look_for_triggered_events(self) -> bool:
+ with file_writer.file_writer(constants.gcal_imminent_pagename) as f:
+ f.write("<h1>Imminent Upcoming Calendar Events:</h1>\n<hr>\n")
+ f.write("<center><table width=99%>\n")
+ now = datetime.datetime.now()
+ count = 0
+ for event in self.sortable_events:
eventstamp = event.start_time
- name = event.friendly_name()
- calendar = event.calendar
- f.write(
- "<LI> %s (%s) upcoming in %d minutes.\n"
- % (name, calendar, minutes[0])
- )
- count += 1
- f.write("</table>")
- f.close()
+ if eventstamp is None:
+ return False
+ delta = eventstamp - now
+ x = int(delta.total_seconds())
+ if x > 0 and x <= constants.seconds_per_minute * 3:
+ days = divmod(x, constants.seconds_per_day)
+ hours = divmod(days[1], constants.seconds_per_hour)
+ minutes = divmod(hours[1], constants.seconds_per_minute)
+ eventstamp = event.start_time
+ name = event.friendly_name()
+ calendar = event.calendar
+ f.write(
+ f"<LI> {name} ({calendar}) upcoming in {int(minutes[0])} minutes.\n"
+ )
+ count += 1
+ f.write("</table>")
if count > 0:
globals.put("gcal_triggered", True)
else:
globals.put("gcal_triggered", False)
return True
+
+
+# Test
+# oauth = gdata_oauth.OAuth(secrets.google_client_id, secrets.google_client_secret)
+# x = gcal_renderer(
+# {"Render Upcoming Events": 10000, "Look For Triggered Events": 1},
+# oauth)
+# x.periodic_render("Render Upcoming Events")