contents of several Google calendars."""
import datetime
-import gdata # type: ignore
-import gdata_oauth
-from oauth2client.client import AccessTokenRefreshError # type: ignore
-import os
+import functools
+import logging
import time
-from typing import Dict, List, Optional, Tuple
+from typing import Any, Dict, List, Optional, Tuple
+
+from dateutil.parser import parse
+import gdata_oauth
+import pytz
-import constants
+import kiosk_constants
import file_writer
import globals
import renderer
-import secrets
-class gcal_renderer(renderer.debuggable_abstaining_renderer):
+logger = logging.getLogger(__file__)
+
+
+class gcal_renderer(renderer.abstaining_renderer):
"""A renderer to fetch upcoming events from www.google.com/calendar"""
calendar_whitelist = frozenset(
]
)
+ @functools.total_ordering
class comparable_event(object):
"""A helper class to sort events."""
) -> None:
if start_time is None:
assert(end_time is None)
+ else:
+ assert(isinstance(start_time, datetime.datetime))
+ if end_time is not None:
+ assert(isinstance(end_time, datetime.datetime))
self.start_time = start_time
self.end_time = end_time
self.summary = summary
that.calendar,
)
- def __str__(self) -> str:
+ def __eq__(self, that) -> bool:
+ return (
+ self.start_time == that.start_time and
+ self.end_time == that.end_time and
+ self.summary == that.summary and
+ self.calendar == that.calendar
+ )
+
+ def __repr__(self) -> str:
return "[%s] %s" % (self.timestamp(), self.friendly_name())
def friendly_name(self) -> str:
return "<B>%s</B>" % name
def timestamp(self) -> str:
+ now = datetime.datetime.now(pytz.timezone("US/Pacific"))
if self.start_time is None:
return "None"
- elif self.start_time.hour == 0:
- return datetime.datetime.strftime(self.start_time, "%a %b %d %Y")
+ elif (
+ self.start_time.hour == 0 and
+ self.start_time.minute == 0 and
+ self.start_time.second == 0
+ ):
+ if self.start_time.year == now.year:
+ return datetime.datetime.strftime(
+ self.start_time,
+ "%a %b %d"
+ )
+ else:
+ return datetime.datetime.strftime(
+ self.start_time,
+ "%a %b %d, %Y"
+ )
else:
- return datetime.datetime.strftime(
- self.start_time, "%a %b %d %Y %H:%M%p"
- )
+ dt = self.start_time
+ zone = dt.tzinfo
+ local_dt = dt.replace(tzinfo=zone).astimezone(tz=pytz.timezone('US/Pacific'))
+ if local_dt.year == now.year:
+ return datetime.datetime.strftime(
+ local_dt, "%a %b %d %I:%M%p"
+ )
+ else:
+ return datetime.datetime.strftime(
+ local_dt, "%a %b %d, %Y %I:%M%p"
+ )
def __init__(
self, name_to_timeout_dict: Dict[str, int], oauth: gdata_oauth.OAuth
) -> None:
- super(gcal_renderer, self).__init__(name_to_timeout_dict, True)
+ super().__init__(name_to_timeout_dict)
self.oauth = oauth
self.client = self.oauth.calendar_service()
self.sortable_events: List[gcal_renderer.comparable_event] = []
return "gcal"
def periodic_render(self, key: str) -> bool:
- self.debug_print('called for "%s"' % key)
+ logger.debug('called for "%s"' % key)
if key == "Render Upcoming Events":
return self.render_upcoming_events()
elif key == "Look For Triggered Events":
raise Exception("Unexpected operation")
def get_min_max_timewindow(self) -> Tuple[str, str]:
- now = datetime.datetime.now()
- _time_min = now - datetime.timedelta(1)
- _time_max = now + datetime.timedelta(95)
+ now = datetime.datetime.now(pytz.timezone("US/Pacific"))
+ _time_min = now - datetime.timedelta(hours=6)
+ _time_max = now + datetime.timedelta(days=95)
time_min = datetime.datetime.strftime(_time_min, "%Y-%m-%dT%H:%M:%SZ")
time_max = datetime.datetime.strftime(_time_max, "%Y-%m-%dT%H:%M:%SZ")
- self.debug_print(f"time_min is {time_min}")
- self.debug_print(f"time_max is {time_max}")
+ logger.debug(f"time_min is {time_min}")
+ logger.debug(f"time_max is {time_max}")
return (time_min, time_max)
@staticmethod
- def parse_date(date_str: str) -> Optional[datetime.datetime]:
- retval = None
- try:
- _ = date_str.get("date")
- if _:
- retval = datetime.datetime.strptime(_, "%Y-%m-%d")
- else:
- _ = date_str.get("dateTime")
- if _:
- retval = datetime.datetime.strptime(_[:-6], "%Y-%m-%dT%H:%M:%S")
- return retval
- except:
- pass
+ def parse_date(date: Any) -> Optional[datetime.datetime]:
+ if isinstance(date, datetime.datetime):
+ return date
+ elif isinstance(date, dict):
+ if 'dateTime' in date:
+ d = date['dateTime']
+ dt = parse(d)
+ if dt.tzinfo is None:
+ dt = dt.replace(tzinfo=None).astimezone(tz=pytz.timezone('US/Pacific'))
+ return dt
+ elif 'date' in date:
+ d = date['date']
+ dt = datetime.datetime.strptime(d, '%Y-%m-%d')
+ dt = dt.replace(tzinfo=None).astimezone(tz=pytz.timezone('US/Pacific'))
+ return dt
+ print(f'Not sure what to do with this {date} ({type(date)}), help?!')
return None
def get_events_from_interesting_calendars(
)
for calendar in calendar_list["items"]:
if calendar["summary"] in gcal_renderer.calendar_whitelist:
- self.debug_print(
+ logger.debug(
f"{calendar['summary']} is an interesting calendar..."
)
events = (
)
for event in events["items"]:
summary = event["summary"]
- self.debug_print(
- f" ... event '{summary}' ({event['start']} to {event['end']}"
- )
start = gcal_renderer.parse_date(event["start"])
end = gcal_renderer.parse_date(event["end"])
+ logger.debug(
+ f" ... event '{summary}' ({event['start']} ({start}) to {event['end']} ({end})"
+ )
if start is not None and end is not None:
+ logger.debug(f' ... adding {summary} to sortable_events')
sortable_events.append(
gcal_renderer.comparable_event(
start, end, summary, calendar["summary"]
or "Holidays" in calendar["summary"]
or "Countdown" in summary
):
- self.debug_print(" ... event is countdown worthy!")
+ logger.debug(f" ... adding {summary} to countdown_events")
countdown_events.append(
gcal_renderer.comparable_event(
start, end, summary, calendar["summary"]
) = self.get_events_from_interesting_calendars(time_min, time_max)
self.sortable_events.sort()
with file_writer.file_writer("gcal_3_86400.html") as f:
- f.write("<h1>Upcoming Calendar Events:</h1><hr>\n")
- f.write("<center><table width=96%>\n")
+ f.write(
+f"""
+<h1>Upcoming Calendar Events:</h1>
+<hr>
+<center>
+<table width=96% style="border-collapse: collapse;">
+"""
+ )
upcoming_sortable_events = self.sortable_events[:12]
- for event in upcoming_sortable_events:
+ for n, event in enumerate(upcoming_sortable_events):
+ logger.debug(f'{n}/12: {event.friendly_name()} / {event.calendar}')
+ if n % 2 == 0:
+ color = "#c6b0b0"
+ else:
+ color = "#eeeeee"
f.write(
- f"""
-<tr>
- <td style="padding-right: 1em;">
- {event.timestamp()}
- </td>
- <td style="padding-left: 1em;">
- {event.friendly_name()}
- </td>
-</tr>\n"""
+f"""
+ <tr>
+ <td style="margin: 0; padding: 0; background: {color};">
+ {event.timestamp()}
+ </td>
+ <td style="margin: 0; padding: 0; background: {color};">
+ {event.friendly_name()}
+ </td>
+ </tr>
+"""
)
f.write("</table></center>\n")
self.countdown_events.sort()
with file_writer.file_writer("countdown_3_7200.html") as g:
g.write("<h1>Countdowns:</h1><hr><ul>\n")
- now = datetime.datetime.now()
+ now = datetime.datetime.now(pytz.timezone("US/Pacific"))
upcoming_countdown_events = self.countdown_events[:12]
count = 0
timestamps = {}
x = int(delta.total_seconds())
if x > 0:
identifier = "id%d" % count
- days = divmod(x, constants.seconds_per_day)
- hours = divmod(days[1], constants.seconds_per_hour)
- minutes = divmod(hours[1], constants.seconds_per_minute)
+ days = divmod(x, kiosk_constants.seconds_per_day)
+ hours = divmod(days[1], kiosk_constants.seconds_per_hour)
+ minutes = divmod(hours[1], kiosk_constants.seconds_per_minute)
g.write(
f'<li><SPAN id="%s">%d days, %02d:%02d</SPAN> until %s</li>\n'
% (
)
timestamps[identifier] = time.mktime(eventstamp.timetuple())
count += 1
- self.debug_print(
+ logger.debug(
"countdown to %s is %dd %dh %dm"
% (name, days[0], hours[0], minutes[0])
)
</script>"""
)
return True
- except (gdata.service.RequestError, AccessTokenRefreshError):
+ except Exception as e:
+ logger.exception(e)
print("********* TRYING TO REFRESH GCAL CLIENT *********")
- self.oauth.refresh_token()
- self.client = self.oauth.calendar_service()
+# self.oauth.refresh_token()
+# self.client = self.oauth.calendar_service()
return False
- except:
- raise
def look_for_triggered_events(self) -> bool:
- with file_writer.file_writer(constants.gcal_imminent_pagename) as f:
+ with file_writer.file_writer(kiosk_constants.gcal_imminent_pagename) as f:
f.write("<h1>Imminent Upcoming Calendar Events:</h1>\n<hr>\n")
f.write("<center><table width=99%>\n")
- now = datetime.datetime.now()
+ now = datetime.datetime.now(pytz.timezone("US/Pacific"))
count = 0
for event in self.sortable_events:
eventstamp = event.start_time
if eventstamp is None:
- return False
+ continue
delta = eventstamp - now
x = int(delta.total_seconds())
- if x > 0 and x <= constants.seconds_per_minute * 3:
- days = divmod(x, constants.seconds_per_day)
- hours = divmod(days[1], constants.seconds_per_hour)
- minutes = divmod(hours[1], constants.seconds_per_minute)
+ if x > -120 and x < 4 * kiosk_constants.seconds_per_minute:
+ days = divmod(x, kiosk_constants.seconds_per_day)
+ hours = divmod(days[1], kiosk_constants.seconds_per_hour)
+ minutes = divmod(hours[1], kiosk_constants.seconds_per_minute)
eventstamp = event.start_time
name = event.friendly_name()
calendar = event.calendar
# Test
-# oauth = gdata_oauth.OAuth(secrets.google_client_id, secrets.google_client_secret)
-# x = gcal_renderer(
-# {"Render Upcoming Events": 10000, "Look For Triggered Events": 1},
-# oauth)
-# x.periodic_render("Render Upcoming Events")
+#oauth = gdata_oauth.OAuth(secrets.google_client_secret)
+#x = gcal_renderer(
+# {"Render Upcoming Events": 10000, "Look For Triggered Events": 1},
+# oauth)
+#x.periodic_render("Render Upcoming Events")