Adding type annotations and fixing up formatting.
[kiosk.git] / gcal_renderer.py
index e6657795ed91d1c057364b7f8f1ada105bf2bb1f..37f8c8e50671b32e2c432ead74221c01359fe58f 100644 (file)
@@ -1,12 +1,21 @@
+#!/usr/bin/env python3
+
+"""Renders an upcoming events page and countdowns page based on the
+contents of several Google calendars."""
+
+import datetime
+import gdata
+import gdata_oauth
 from oauth2client.client import AccessTokenRefreshError
+import os
+import time
+from typing import Dict, List, Tuple
+
 import constants
-import datetime
 import file_writer
-import gdata
 import globals
-import os
 import renderer
-import time
+import secrets
 
 
 class gcal_renderer(renderer.debuggable_abstaining_renderer):
@@ -28,7 +37,13 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer):
     class comparable_event(object):
         """A helper class to sort events."""
 
-        def __init__(self, start_time, end_time, summary, calendar):
+        def __init__(
+            self,
+            start_time: datetime.datetime,
+            end_time: datetime.datetime,
+            summary: str,
+            calendar: str,
+        ) -> None:
             if start_time is None:
                 assert end_time is None
             self.start_time = start_time
@@ -36,7 +51,7 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer):
             self.summary = summary
             self.calendar = calendar
 
-        def __lt__(self, that):
+        def __lt__(self, that) -> bool:
             if self.start_time is None and that.start_time is None:
                 return self.summary < that.summary
             if self.start_time is None or that.start_time is None:
@@ -48,15 +63,15 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer):
                 that.calendar,
             )
 
-        def __str__(self):
+        def __str__(self) -> str:
             return "[%s]&nbsp;%s" % (self.timestamp(), self.friendly_name())
 
-        def friendly_name(self):
+        def friendly_name(self) -> str:
             name = self.summary
             name = name.replace("countdown:", "")
             return "<B>%s</B>" % name
 
-        def timestamp(self):
+        def timestamp(self) -> str:
             if self.start_time is None:
                 return "None"
             elif self.start_time.hour == 0:
@@ -66,17 +81,19 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer):
                     self.start_time, "%a %b %d %Y %H:%M%p"
                 )
 
-    def __init__(self, name_to_timeout_dict, oauth):
+    def __init__(
+        self, name_to_timeout_dict: Dict[str, int], oauth: gdata_oauth.OAuth
+    ) -> None:
         super(gcal_renderer, self).__init__(name_to_timeout_dict, True)
         self.oauth = oauth
         self.client = self.oauth.calendar_service()
         self.sortable_events = []
         self.countdown_events = []
 
-    def debug_prefix(self):
+    def debug_prefix(self) -> str:
         return "gcal"
 
-    def periodic_render(self, key):
+    def periodic_render(self, key: str) -> bool:
         self.debug_print('called for "%s"' % key)
         if key == "Render Upcoming Events":
             return self.render_upcoming_events()
@@ -85,148 +102,160 @@ class gcal_renderer(renderer.debuggable_abstaining_renderer):
         else:
             raise error("Unexpected operation")
 
-    def render_upcoming_events(self):
-        page_token = None
-
-        def format_datetime(x):
-            return datetime.datetime.strftime(x, "%Y-%m-%dT%H:%M:%SZ")
-
+    def get_min_max_timewindow(self) -> Tuple[str, str]:
         now = datetime.datetime.now()
         time_min = now - datetime.timedelta(1)
         time_max = now + datetime.timedelta(95)
-        time_min, time_max = list(map(format_datetime, (time_min, time_max)))
+        time_min, time_max = list(
+            map(
+                lambda x: datetime.datetime.strftime(x, "%Y-%m-%dT%H:%M:%SZ"),
+                (time_min, time_max),
+            )
+        )
+        print(type(time_min))
         self.debug_print("time_min is %s" % time_min)
         self.debug_print("time_max is %s" % time_max)
+        return (time_min, time_max)
 
-        # Writes 2 files:
-        #  + "upcoming events",
-        #  + a countdown timer for a subser of events,
-        f = file_writer.file_writer("gcal_3_86400.html")
-        f.write("<h1>Upcoming Calendar Events:</h1><hr>\n")
-        f.write("<center><table width=96%>\n")
-
-        g = file_writer.file_writer("countdown_3_7200.html")
-        g.write("<h1>Countdowns:</h1><hr><ul>\n")
-
+    @staticmethod
+    def parse_date(date_str: str) -> datetime.datetime:
+        retval = None
         try:
-            self.sortable_events = []
-            self.countdown_events = []
-            while True:
-                calendar_list = (
-                    self.client.calendarList().list(pageToken=page_token).execute()
-                )
-                for calendar in calendar_list["items"]:
-                    if calendar["summary"] in gcal_renderer.calendar_whitelist:
-                        events = (
-                            self.client.events()
-                            .list(
-                                calendarId=calendar["id"],
-                                singleEvents=True,
-                                timeMin=time_min,
-                                timeMax=time_max,
-                                maxResults=50,
-                            )
-                            .execute()
-                        )
-
-                        def parse_date(x):
-                            y = x.get("date")
-                            if y:
-                                y = datetime.datetime.strptime(y, "%Y-%m-%d")
-                            else:
-                                y = x.get("dateTime")
-                                if y:
-                                    y = datetime.datetime.strptime(
-                                        y[:-6], "%Y-%m-%dT%H:%M:%S"
-                                    )
-                                else:
-                                    y = None
-                            return y
+            _ = date_str.get("date")
+            if _:
+                retval = datetime.datetime.strptime(_, "%Y-%m-%d")
+            else:
+                _ = date_str.get("dateTime")
+                if _:
+                    retval = datetime.datetime.strptime(_[:-6], "%Y-%m-%dT%H:%M:%S")
+            return retval
+        except:
+            pass
+        return None
 
-                        for event in events["items"]:
-                            try:
-                                summary = event["summary"]
-                                self.debug_print(
-                                    "event '%s' (%s to %s)"
-                                    % (summary, event["start"], event["end"])
+    def get_events_from_interesting_calendars(
+        self, time_min: str, time_max: str
+    ) -> Tuple[List[comparable_event], List[comparable_event]]:
+        page_token = None
+        sortable_events = []
+        countdown_events = []
+        while True:
+            calendar_list = (
+                self.client.calendarList().list(pageToken=page_token).execute()
+            )
+            for calendar in calendar_list["items"]:
+                if calendar["summary"] in gcal_renderer.calendar_whitelist:
+                    self.debug_print(
+                        f"{calendar['summary']} is an interesting calendar..."
+                    )
+                    events = (
+                        self.client.events()
+                        .list(
+                            calendarId=calendar["id"],
+                            singleEvents=True,
+                            timeMin=time_min,
+                            timeMax=time_max,
+                            maxResults=50,
+                        )
+                        .execute()
+                    )
+                    for event in events["items"]:
+                        summary = event["summary"]
+                        self.debug_print(
+                            f" ... event '{summary}' ({event['start']} to {event['end']}"
+                        )
+                        start = gcal_renderer.parse_date(event["start"])
+                        end = gcal_renderer.parse_date(event["end"])
+                        if start is not None and end is not None:
+                            sortable_events.append(
+                                gcal_renderer.comparable_event(
+                                    start, end, summary, calendar["summary"]
                                 )
-                                start = parse_date(event["start"])
-                                end = parse_date(event["end"])
-                                self.sortable_events.append(
+                            )
+                            if (
+                                "countdown" in summary
+                                or "Holidays" in calendar["summary"]
+                                or "Countdown" in summary
+                            ):
+                                self.debug_print(" ... event is countdown worthy!")
+                                countdown_events.append(
                                     gcal_renderer.comparable_event(
                                         start, end, summary, calendar["summary"]
                                     )
                                 )
-                                if (
-                                    "countdown" in summary
-                                    or "Holidays" in calendar["summary"]
-                                    or "Countdown" in summary
-                                ):
-                                    self.debug_print("event is countdown worthy")
-                                    self.countdown_events.append(
-                                        gcal_renderer.comparable_event(
-                                            start, end, summary, calendar["summary"]
-                                        )
-                                    )
-                            except Exception as e:
-                                print("gcal unknown exception, skipping event.")
-                    else:
-                        self.debug_print("Skipping calendar '%s'" % calendar["summary"])
-                page_token = calendar_list.get("nextPageToken")
-                if not page_token:
-                    break
+            page_token = calendar_list.get("nextPageToken")
+            if not page_token:
+                break
+        return (sortable_events, countdown_events)
 
+    def render_upcoming_events(self) -> bool:
+        (time_min, time_max) = self.get_min_max_timewindow()
+        try:
+            # Populate the "Upcoming Events" page.
+            (
+                self.sortable_events,
+                self.countdown_events,
+            ) = self.get_events_from_interesting_calendars(time_min, time_max)
             self.sortable_events.sort()
-            upcoming_sortable_events = self.sortable_events[:12]
-            for event in upcoming_sortable_events:
-                self.debug_print("sorted event: %s" % event.friendly_name())
-                f.write(
-                    """
+            with file_writer.file_writer("gcal_3_86400.html") as f:
+                f.write("<h1>Upcoming Calendar Events:</h1><hr>\n")
+                f.write("<center><table width=96%>\n")
+                upcoming_sortable_events = self.sortable_events[:12]
+                for event in upcoming_sortable_events:
+                    f.write(
+                        f"""
 <tr>
   <td style="padding-right: 1em;">
-    %s
+    {event.timestamp()}
   </td>
   <td style="padding-left: 1em;">
-    %s
+    {event.friendly_name()}
   </td>
 </tr>\n"""
-                    % (event.timestamp(), event.friendly_name())
-                )
-            f.write("</table></center>\n")
-            f.close()
+                    )
+                f.write("</table></center>\n")
 
+            # Populate the "Countdown" page.
             self.countdown_events.sort()
-            upcoming_countdown_events = self.countdown_events[:12]
-            now = datetime.datetime.now()
-            count = 0
-            timestamps = {}
-            for event in upcoming_countdown_events:
-                eventstamp = event.start_time
-                delta = eventstamp - now
-                name = event.friendly_name()
-                x = int(delta.total_seconds())
-                if x > 0:
-                    identifier = "id%d" % count
-                    days = divmod(x, constants.seconds_per_day)
-                    hours = divmod(days[1], constants.seconds_per_hour)
-                    minutes = divmod(hours[1], constants.seconds_per_minute)
-                    g.write(
-                        '<li><SPAN id="%s">%d days, %02d:%02d</SPAN> until %s</li>\n'
-                        % (identifier, days[0], hours[0], minutes[0], name)
-                    )
-                    timestamps[identifier] = time.mktime(eventstamp.timetuple())
-                    count += 1
-                    self.debug_print(
-                        "countdown to %s is %dd %dh %dm"
-                        % (name, days[0], hours[0], minutes[0])
-                    )
-            g.write("</ul>")
-            g.write("<SCRIPT>\nlet timestampMap = new Map([")
-            for x in list(timestamps.keys()):
-                g.write('    ["%s", %f],\n' % (x, timestamps[x] * 1000.0))
-            g.write("]);\n\n")
-            g.write(
-                """
+            with file_writer.file_writer("countdown_3_7200.html") as g:
+                g.write("<h1>Countdowns:</h1><hr><ul>\n")
+                now = datetime.datetime.now()
+                upcoming_countdown_events = self.countdown_events[:12]
+                count = 0
+                timestamps = {}
+                for event in upcoming_countdown_events:
+                    eventstamp = event.start_time
+                    name = event.friendly_name()
+                    delta = eventstamp - now
+                    x = int(delta.total_seconds())
+                    if x > 0:
+                        identifier = "id%d" % count
+                        days = divmod(x, constants.seconds_per_day)
+                        hours = divmod(days[1], constants.seconds_per_hour)
+                        minutes = divmod(hours[1], constants.seconds_per_minute)
+                        g.write(
+                            f'<li><SPAN id="%s">%d days, %02d:%02d</SPAN> until %s</li>\n'
+                            % (
+                                identifier,
+                                int(days[0]),
+                                int(hours[0]),
+                                int(minutes[0]),
+                                name,
+                            )
+                        )
+                        timestamps[identifier] = time.mktime(eventstamp.timetuple())
+                        count += 1
+                        self.debug_print(
+                            "countdown to %s is %dd %dh %dm"
+                            % (name, days[0], hours[0], minutes[0])
+                        )
+                g.write("</ul>")
+                g.write("<SCRIPT>\nlet timestampMap = new Map([")
+                for x in list(timestamps.keys()):
+                    g.write(f'    ["{x}", {timestamps[x] * 1000.0}],\n')
+                g.write("]);\n\n")
+                g.write(
+                    """
 // Pad things with a leading zero if necessary.
 function pad(n) {
     return (n < 10) ? ("0" + n) : n;
@@ -258,8 +287,7 @@ var fn = setInterval(function() {
     }
 }, 1000);
 </script>"""
-            )
-            g.close()
+                )
             return True
         except (gdata.service.RequestError, AccessTokenRefreshError):
             print("********* TRYING TO REFRESH GCAL CLIENT *********")
@@ -269,32 +297,38 @@ var fn = setInterval(function() {
         except:
             raise
 
-    def look_for_triggered_events(self):
-        f = file_writer.file_writer(constants.gcal_imminent_pagename)
-        f.write("<h1>Imminent Upcoming Calendar Events:</h1>\n<hr>\n")
-        f.write("<center><table width=99%>\n")
-        now = datetime.datetime.now()
-        count = 0
-        for event in self.sortable_events:
-            eventstamp = event.start_time
-            delta = eventstamp - now
-            x = int(delta.total_seconds())
-            if x > 0 and x <= constants.seconds_per_minute * 3:
-                days = divmod(x, constants.seconds_per_day)
-                hours = divmod(days[1], constants.seconds_per_hour)
-                minutes = divmod(hours[1], constants.seconds_per_minute)
+    def look_for_triggered_events(self) -> bool:
+        with file_writer.file_writer(constants.gcal_imminent_pagename) as f:
+            f.write("<h1>Imminent Upcoming Calendar Events:</h1>\n<hr>\n")
+            f.write("<center><table width=99%>\n")
+            now = datetime.datetime.now()
+            count = 0
+            for event in self.sortable_events:
                 eventstamp = event.start_time
-                name = event.friendly_name()
-                calendar = event.calendar
-                f.write(
-                    "<LI> %s (%s) upcoming in %d minutes.\n"
-                    % (name, calendar, minutes[0])
-                )
-                count += 1
-        f.write("</table>")
-        f.close()
+                delta = eventstamp - now
+                x = int(delta.total_seconds())
+                if x > 0 and x <= constants.seconds_per_minute * 3:
+                    days = divmod(x, constants.seconds_per_day)
+                    hours = divmod(days[1], constants.seconds_per_hour)
+                    minutes = divmod(hours[1], constants.seconds_per_minute)
+                    eventstamp = event.start_time
+                    name = event.friendly_name()
+                    calendar = event.calendar
+                    f.write(
+                        f"<LI> {name} ({calendar}) upcoming in {int(minutes[0])} minutes.\n"
+                    )
+                    count += 1
+            f.write("</table>")
         if count > 0:
             globals.put("gcal_triggered", True)
         else:
             globals.put("gcal_triggered", False)
         return True
+
+
+# Test
+# oauth = gdata_oauth.OAuth(secrets.google_client_id, secrets.google_client_secret)
+# x = gcal_renderer(
+#    {"Render Upcoming Events": 10000, "Look For Triggered Events": 1},
+#    oauth)
+# x.periodic_render("Render Upcoming Events")