mypy clean
[kiosk.git] / gcal_renderer.py
1 #!/usr/bin/env python3
2
3 """Renders an upcoming events page and countdowns page based on the
4 contents of several Google calendars."""
5
6 import datetime
7 import gdata  # type: ignore
8 import gdata_oauth
9 from oauth2client.client import AccessTokenRefreshError  # type: ignore
10 import os
11 import time
12 from typing import Dict, List, Optional, Tuple
13
14 import constants
15 import file_writer
16 import globals
17 import renderer
18 import secrets
19
20
21 class gcal_renderer(renderer.debuggable_abstaining_renderer):
22     """A renderer to fetch upcoming events from www.google.com/calendar"""
23
24     calendar_whitelist = frozenset(
25         [
26             "Alex's calendar",
27             "Family",
28             "Holidays in United States",
29             "Lynn Gasch",
30             "Lynn's Work",
31             "[email protected]",
32             "Scott Gasch External - Misc",
33             "Birthdays",  # <-- from g+ contacts
34         ]
35     )
36
37     class comparable_event(object):
38         """A helper class to sort events."""
39
40         def __init__(
41             self,
42             start_time: Optional[datetime.datetime],
43             end_time: Optional[datetime.datetime],
44             summary: str,
45             calendar: str,
46         ) -> None:
47             if start_time is None:
48                 assert(end_time is None)
49             self.start_time = start_time
50             self.end_time = end_time
51             self.summary = summary
52             self.calendar = calendar
53
54         def __lt__(self, that) -> bool:
55             if self.start_time is None and that.start_time is None:
56                 return self.summary < that.summary
57             if self.start_time is None or that.start_time is None:
58                 return self.start_time is None
59             return (self.start_time, self.end_time, self.summary, self.calendar) < (
60                 that.start_time,
61                 that.end_time,
62                 that.summary,
63                 that.calendar,
64             )
65
66         def __str__(self) -> str:
67             return "[%s]&nbsp;%s" % (self.timestamp(), self.friendly_name())
68
69         def friendly_name(self) -> str:
70             name = self.summary
71             name = name.replace("countdown:", "")
72             return "<B>%s</B>" % name
73
74         def timestamp(self) -> str:
75             if self.start_time is None:
76                 return "None"
77             elif self.start_time.hour == 0:
78                 return datetime.datetime.strftime(self.start_time, "%a %b %d %Y")
79             else:
80                 return datetime.datetime.strftime(
81                     self.start_time, "%a %b %d %Y %H:%M%p"
82                 )
83
84     def __init__(
85         self, name_to_timeout_dict: Dict[str, int], oauth: gdata_oauth.OAuth
86     ) -> None:
87         super(gcal_renderer, self).__init__(name_to_timeout_dict, True)
88         self.oauth = oauth
89         self.client = self.oauth.calendar_service()
90         self.sortable_events: List[gcal_renderer.comparable_event] = []
91         self.countdown_events: List[gcal_renderer.comparable_event] = []
92
93     def debug_prefix(self) -> str:
94         return "gcal"
95
96     def periodic_render(self, key: str) -> bool:
97         self.debug_print('called for "%s"' % key)
98         if key == "Render Upcoming Events":
99             return self.render_upcoming_events()
100         elif key == "Look For Triggered Events":
101             return self.look_for_triggered_events()
102         else:
103             raise Exception("Unexpected operation")
104
105     def get_min_max_timewindow(self) -> Tuple[str, str]:
106         now = datetime.datetime.now()
107         _time_min = now - datetime.timedelta(1)
108         _time_max = now + datetime.timedelta(95)
109         time_min = datetime.datetime.strftime(_time_min, "%Y-%m-%dT%H:%M:%SZ")
110         time_max = datetime.datetime.strftime(_time_max, "%Y-%m-%dT%H:%M:%SZ")
111         self.debug_print(f"time_min is {time_min}")
112         self.debug_print(f"time_max is {time_max}")
113         return (time_min, time_max)
114
115     @staticmethod
116     def parse_date(date_str: str) -> Optional[datetime.datetime]:
117         retval = None
118         try:
119             _ = date_str.get("date")
120             if _:
121                 retval = datetime.datetime.strptime(_, "%Y-%m-%d")
122             else:
123                 _ = date_str.get("dateTime")
124                 if _:
125                     retval = datetime.datetime.strptime(_[:-6], "%Y-%m-%dT%H:%M:%S")
126             return retval
127         except:
128             pass
129         return None
130
131     def get_events_from_interesting_calendars(
132         self, time_min: str, time_max: str
133     ) -> Tuple[List[comparable_event], List[comparable_event]]:
134         page_token = None
135         sortable_events = []
136         countdown_events = []
137         while True:
138             calendar_list = (
139                 self.client.calendarList().list(pageToken=page_token).execute()
140             )
141             for calendar in calendar_list["items"]:
142                 if calendar["summary"] in gcal_renderer.calendar_whitelist:
143                     self.debug_print(
144                         f"{calendar['summary']} is an interesting calendar..."
145                     )
146                     events = (
147                         self.client.events()
148                         .list(
149                             calendarId=calendar["id"],
150                             singleEvents=True,
151                             timeMin=time_min,
152                             timeMax=time_max,
153                             maxResults=50,
154                         )
155                         .execute()
156                     )
157                     for event in events["items"]:
158                         summary = event["summary"]
159                         self.debug_print(
160                             f" ... event '{summary}' ({event['start']} to {event['end']}"
161                         )
162                         start = gcal_renderer.parse_date(event["start"])
163                         end = gcal_renderer.parse_date(event["end"])
164                         if start is not None and end is not None:
165                             sortable_events.append(
166                                 gcal_renderer.comparable_event(
167                                     start, end, summary, calendar["summary"]
168                                 )
169                             )
170                             if (
171                                 "countdown" in summary
172                                 or "Holidays" in calendar["summary"]
173                                 or "Countdown" in summary
174                             ):
175                                 self.debug_print(" ... event is countdown worthy!")
176                                 countdown_events.append(
177                                     gcal_renderer.comparable_event(
178                                         start, end, summary, calendar["summary"]
179                                     )
180                                 )
181             page_token = calendar_list.get("nextPageToken")
182             if not page_token:
183                 break
184         return (sortable_events, countdown_events)
185
186     def render_upcoming_events(self) -> bool:
187         (time_min, time_max) = self.get_min_max_timewindow()
188         try:
189             # Populate the "Upcoming Events" page.
190             (
191                 self.sortable_events,
192                 self.countdown_events,
193             ) = self.get_events_from_interesting_calendars(time_min, time_max)
194             self.sortable_events.sort()
195             with file_writer.file_writer("gcal_3_86400.html") as f:
196                 f.write("<h1>Upcoming Calendar Events:</h1><hr>\n")
197                 f.write("<center><table width=96%>\n")
198                 upcoming_sortable_events = self.sortable_events[:12]
199                 for event in upcoming_sortable_events:
200                     f.write(
201                         f"""
202 <tr>
203   <td style="padding-right: 1em;">
204     {event.timestamp()}
205   </td>
206   <td style="padding-left: 1em;">
207     {event.friendly_name()}
208   </td>
209 </tr>\n"""
210                     )
211                 f.write("</table></center>\n")
212
213             # Populate the "Countdown" page.
214             self.countdown_events.sort()
215             with file_writer.file_writer("countdown_3_7200.html") as g:
216                 g.write("<h1>Countdowns:</h1><hr><ul>\n")
217                 now = datetime.datetime.now()
218                 upcoming_countdown_events = self.countdown_events[:12]
219                 count = 0
220                 timestamps = {}
221                 for event in upcoming_countdown_events:
222                     eventstamp = event.start_time
223                     if eventstamp is None:
224                         return False
225                     name = event.friendly_name()
226                     delta = eventstamp - now
227                     x = int(delta.total_seconds())
228                     if x > 0:
229                         identifier = "id%d" % count
230                         days = divmod(x, constants.seconds_per_day)
231                         hours = divmod(days[1], constants.seconds_per_hour)
232                         minutes = divmod(hours[1], constants.seconds_per_minute)
233                         g.write(
234                             f'<li><SPAN id="%s">%d days, %02d:%02d</SPAN> until %s</li>\n'
235                             % (
236                                 identifier,
237                                 int(days[0]),
238                                 int(hours[0]),
239                                 int(minutes[0]),
240                                 name,
241                             )
242                         )
243                         timestamps[identifier] = time.mktime(eventstamp.timetuple())
244                         count += 1
245                         self.debug_print(
246                             "countdown to %s is %dd %dh %dm"
247                             % (name, days[0], hours[0], minutes[0])
248                         )
249                 g.write("</ul>")
250                 g.write("<SCRIPT>\nlet timestampMap = new Map([")
251                 for _ in list(timestamps.keys()):
252                     g.write(f'    ["{_}", {timestamps[_] * 1000.0}],\n')
253                 g.write("]);\n\n")
254                 g.write(
255                     """
256 // Pad things with a leading zero if necessary.
257 function pad(n) {
258     return (n < 10) ? ("0" + n) : n;
259 }
260
261 // Return an 's' if things are plural.
262 function plural(n) {
263     return (n == 1) ? "" : "s";
264 }
265
266 // Periodic function to run the page timers.
267 var fn = setInterval(function() {
268     var now = new Date().getTime();
269     for (let [id, timestamp] of timestampMap) {
270         var delta = timestamp - now;
271
272         if (delta > 0) {
273             var days = Math.floor(delta / (1000 * 60 * 60 * 24));
274             var hours = pad(Math.floor((delta % (1000 * 60 * 60 * 24)) / (1000 * 60 * 60)));
275             var minutes = pad(Math.floor((delta % (1000 * 60 * 60)) / (1000 * 60)));
276             var seconds = pad(Math.floor((delta % (1000 * 60)) / 1000));
277
278             var s = days + " day" + plural(days) + ", ";
279             s = s + hours + ":" + minutes;
280             document.getElementById(id).innerHTML = s;
281         } else {
282             document.getElementById(id).innerHTML = "EXPIRED";
283         }
284     }
285 }, 1000);
286 </script>"""
287                 )
288             return True
289         except (gdata.service.RequestError, AccessTokenRefreshError):
290             print("********* TRYING TO REFRESH GCAL CLIENT *********")
291             self.oauth.refresh_token()
292             self.client = self.oauth.calendar_service()
293             return False
294         except:
295             raise
296
297     def look_for_triggered_events(self) -> bool:
298         with file_writer.file_writer(constants.gcal_imminent_pagename) as f:
299             f.write("<h1>Imminent Upcoming Calendar Events:</h1>\n<hr>\n")
300             f.write("<center><table width=99%>\n")
301             now = datetime.datetime.now()
302             count = 0
303             for event in self.sortable_events:
304                 eventstamp = event.start_time
305                 if eventstamp is None:
306                     return False
307                 delta = eventstamp - now
308                 x = int(delta.total_seconds())
309                 if x > 0 and x <= constants.seconds_per_minute * 3:
310                     days = divmod(x, constants.seconds_per_day)
311                     hours = divmod(days[1], constants.seconds_per_hour)
312                     minutes = divmod(hours[1], constants.seconds_per_minute)
313                     eventstamp = event.start_time
314                     name = event.friendly_name()
315                     calendar = event.calendar
316                     f.write(
317                         f"<LI> {name} ({calendar}) upcoming in {int(minutes[0])} minutes.\n"
318                     )
319                     count += 1
320             f.write("</table>")
321         if count > 0:
322             globals.put("gcal_triggered", True)
323         else:
324             globals.put("gcal_triggered", False)
325         return True
326
327
328 # Test
329 # oauth = gdata_oauth.OAuth(secrets.google_client_id, secrets.google_client_secret)
330 # x = gcal_renderer(
331 #    {"Render Upcoming Events": 10000, "Look For Triggered Events": 1},
332 #    oauth)
333 # x.periodic_render("Render Upcoming Events")