triggered = self.check_for_triggers()
# First try to satisfy from the page queue.
+ now = datetime.datetime.now()
if len(self.page_queue) > 0:
print("chooser: Pulling page from queue...")
page = None
self.page_queue.remove((page, priority))
return (page, triggered)
+ # Always show the clock in the middle of the night.
+ elif now.hour < 7:
+ for page in self.pages:
+ if "clock" in page:
+ return (page, False)
+
# Fall back on weighted random choice.
else:
return (weighted_random_chooser.choose_next_page(self), False)
pages_dir = "/var/www/kiosk/pages"
refresh_period_sec = 22.0
+refresh_period_night_sec = 600.0
render_period_sec = 30.0
seconds_per_minute = 60
import os
import time
-from typing import Dict
+from typing import Dict, List
import constants
import file_writer
#!/usr/bin/env python3
from datetime import datetime
+import gc
+import linecache
import os
import sys
from threading import Thread
import time
import traceback
+import tracemalloc
from typing import Optional
import constants
)
+def thread_janitor() -> None:
+ tracemalloc.start()
+ tracemalloc_target = 0.0
+ gc_target = 0.0
+ gc.enable()
+
+ while True:
+ now = time.time()
+ if now > tracemalloc_target:
+ tracemalloc_target = now + 30.0
+ snapshot = tracemalloc.take_snapshot()
+ snapshot = snapshot.filter_traces((
+ tracemalloc.Filter(False, "<frozen importlib._bootstrap>"),
+ tracemalloc.Filter(False, "<unknown>"),
+ ))
+ key_type = 'lineno'
+ limit = 10
+ top_stats = snapshot.statistics(key_type)
+ print("janitor: Top %s lines" % limit)
+ for index, stat in enumerate(top_stats[:limit], 1):
+ frame = stat.traceback[0]
+ # replace "/path/to/module/file.py" with "module/file.py"
+ filename = os.sep.join(frame.filename.split(os.sep)[-2:])
+ print("janitor: #%s: %s:%s: %.1f KiB"
+ % (index, filename, frame.lineno, stat.size / 1024))
+ line = linecache.getline(frame.filename, frame.lineno).strip()
+ if line:
+ print('janitor: %s' % line)
+
+ other = top_stats[limit:]
+ if other:
+ size = sum(stat.size for stat in other)
+ print("janitor: %s other: %.1f KiB" % (len(other), size / 1024))
+ total = sum(stat.size for stat in top_stats)
+ print("janitor: Total allocated size: %.1f KiB" % (total / 1024))
+ if now > gc_target:
+ print("janitor: Running gc operation")
+ gc_target = now + 60.0
+ gc.collect()
+ time.sleep(10.0)
+
+
def thread_change_current() -> None:
page_chooser = chooser.weighted_random_chooser_with_triggers(
trigger_catalog.get_triggers(), [filter_news_during_dinnertime]
else:
return "FFFFFF"
+ def get_refresh_period() -> float:
+ now = datetime.now()
+ if now.hour < 7:
+ return constants.refresh_period_night_sec * 1000
+ else:
+ return constants.refresh_period_sec * 1000
+
age = utils.describe_age_of_file_briefly(f"pages/{filename}")
bgcolor = pick_background_color()
f.write(
</BODY>"""
% (
bgcolor,
- constants.refresh_period_sec * 1000,
+ get_refresh_period(),
constants.hostname,
bgcolor,
filename,
logging.basicConfig()
changer_thread: Optional[Thread] = None
renderer_thread: Optional[Thread] = None
+ janitor_thread: Optional[Thread] = None
while True:
if changer_thread is None or not changer_thread.is_alive():
print(
)
renderer_thread = Thread(target=thread_invoke_renderers, args=())
renderer_thread.start()
+ if janitor_thread is None or not janitor_thread.is_alive():
+ print(
+ f"MAIN[{utils.timestamp()}] - (Re?)initializing janitor thread... (wtf?!)"
+ )
+ janitor_thread = Thread(target=thread_janitor, args=())
+ janitor_thread.start()
time.sleep(60)
import re
from typing import Dict, Set
-import constants
import file_writer
import renderer
class local_photos_mirror_renderer(renderer.debuggable_abstaining_renderer):
"""A renderer that uses a local mirror of Google photos"""
- ALBUM_ROOT_DIR = "/var/www/kiosk/pages/images/gphotos/albums"
+ album_root_directory = "/var/www/kiosk/pages/images/gphotos/albums"
album_whitelist = frozenset(
[
"""Walk the filesystem looking for photos in whitelisted albums and
keep their paths in memory.
"""
- for root, subdirs, files in os.walk(self.ALBUM_ROOT_DIR):
+ for root, subdirs, files in os.walk(self.album_root_directory):
last_dir = root.rsplit("/", 1)[1]
if self.album_is_in_whitelist(last_dir):
for filename in files:
if extension in self.extension_whitelist:
photo_path = os.path.join(root, filename)
photo_url = photo_path.replace(
- "/var/www/", f"http://{constants.hostname}/", 1
+ "/var/www/", "http://kiosk.house/", 1
)
self.candidate_photos.add(photo_url)
return True
super(showerthoughts_reddit_renderer, self).__init__(
name_to_timeout_dict,
["showerthoughts"],
- min_votes=350,
+ min_votes=250,
additional_filters=[
showerthoughts_reddit_renderer.dont_tell_me_about_gift_cards
],
"Seattle Times Segments",
),
weather_renderer.weather_renderer(
- {"Fetch Weather (Bellevue)": (hours * 4)}, "home"
+ {"Fetch Weather (Bellevue)": (hours * 6)}, "home"
),
weather_renderer.weather_renderer(
- {"Fetch Weather (Stevens)": (hours * 4)}, "stevens"
+ {"Fetch Weather (Stevens)": (hours * 6)}, "stevens"
),
- weather_renderer.weather_renderer({"Fetch Weather (Telma)": (hours * 4)}, "telma"),
+ weather_renderer.weather_renderer(
+ {"Fetch Weather (Telma)": (hours * 6)}, "telma"),
local_photos_mirror_renderer.local_photos_mirror_renderer(
{"Index Photos": (hours * 24), "Choose Photo": (always)}
),