This is the current running state of the kiosk sans the secrets.
[kiosk.git] / chooser.py
1 #!/usr/bin/env python3
2
3 from abc import ABC, abstractmethod
4 import logging
5 import os
6 import random
7 import re
8 import time
9 from typing import Any, Callable, List, Optional, Set, Tuple
10
11 from pyutils.datetimes import datetime_utils
12
13 import kiosk_constants
14 import trigger
15
16
17 logger = logging.getLogger(__file__)
18
19
20 class chooser(ABC):
21     """Base class of a thing that chooses pages"""
22
23     def __init__(self):
24         pass
25
26     def get_page_list(self) -> List[str]:
27         now = time.time()
28         valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
29         filenames = []
30         pages = [
31             f
32             for f in os.listdir(kiosk_constants.pages_dir)
33             if os.path.isfile(os.path.join(kiosk_constants.pages_dir, f))
34         ]
35         for page in pages:
36             result = re.match(valid_filename, page)
37             if result is not None:
38                 if result.group(3) != "none":
39                     freshness_requirement = int(result.group(3))
40                     last_modified = int(
41                         os.path.getmtime(os.path.join(kiosk_constants.pages_dir, page))
42                     )
43                     age = now - last_modified
44                     if age > freshness_requirement:
45                         logger.warning(
46                             f'chooser: "{page}" is too old.'
47                         )
48                         continue
49                 logger.info(
50                     f'chooser: candidate page: "{page}"'
51                 )
52                 filenames.append(page)
53         return filenames
54
55     @abstractmethod
56     def choose_next_page(self) -> Any:
57         pass
58
59
60 class weighted_random_chooser(chooser):
61     """Chooser that does it via weighted RNG."""
62
63     def __init__(self, filter_list: Optional[List[Callable[[str], bool]]]) -> None:
64         super().__init__()
65         self.last_choice = None
66         self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
67         self.pages: Optional[List[str]] = None
68         self.count = 0
69         self.filter_list: List[Callable[[str], bool]] = []
70         if filter_list is not None:
71             self.filter_list.extend(filter_list)
72         self.filter_list.append(self.dont_choose_page_twice_in_a_row_filter)
73
74     def dont_choose_page_twice_in_a_row_filter(self, choice: str) -> bool:
75         if self.last_choice is not None and choice == self.last_choice:
76             return False
77         self.last_choice = choice
78         return True
79
80     def choose_next_page(self) -> Any:
81         if self.pages is None or self.count % 100 == 0:
82             logger.info('chooser: refreshing the candidate pages list.')
83             self.pages = self.get_page_list()
84
85         total_weight = 0
86         weights = []
87         for page in self.pages:
88             result = re.match(self.valid_filename, page)
89             if result is not None:
90                 weight = int(result.group(2))
91                 weights.append(weight)
92                 total_weight += weight
93         if total_weight <= 0:
94             raise Exception
95
96         while True:
97             random_pick = random.randrange(0, total_weight - 1)
98             so_far = 0
99             for x in range(0, len(weights)):
100                 so_far += weights[x]
101                 if so_far > random_pick:
102                     break
103             choice = self.pages[x]
104
105             # Allow filters list to suppress pages.
106             choice_is_filtered = False
107             for f in self.filter_list:
108                 if not f(choice):
109                     choice_is_filtered = True
110                     break
111             if choice_is_filtered:
112                 continue
113
114             # We're good...
115             self.count += 1
116             return choice
117
118
119 class weighted_random_chooser_with_triggers(weighted_random_chooser):
120     """Same as WRC but has trigger events"""
121
122     def __init__(
123         self,
124         trigger_list: Optional[List[trigger.trigger]],
125         filter_list: List[Callable[[str], bool]],
126     ) -> None:
127         super().__init__(filter_list)
128         self.trigger_list: List[trigger.trigger] = []
129         if trigger_list is not None:
130             self.trigger_list.extend(trigger_list)
131         self.page_queue: Set[Tuple[str, int]] = set(())
132
133     def check_for_triggers(self) -> bool:
134         triggered = False
135         for t in self.trigger_list:
136             x = t.get_triggered_page_list()
137             if x is not None and len(x) > 0:
138                 for y in x:
139                     self.page_queue.add(y)
140                     logger.info(f'chooser: noticed active trigger {y}')
141                     triggered = True
142         return triggered
143
144     def choose_next_page(self) -> Tuple[str, bool]:
145         if self.pages is None or self.count % 100 == 0:
146             logger.info('chooser: refreshing the candidates page list')
147             self.pages = self.get_page_list()
148
149         triggered = self.check_for_triggers()
150
151         # First try to satisfy from the page queue.
152         if len(self.page_queue) > 0:
153             logger.info('chooser: page queue has entries; pulling choice from there.')
154             page = None
155             priority = None
156             for t in self.page_queue:
157                 if priority is None or t[1] > priority:
158                     page = t[0]
159                     priority = t[1]
160             assert(page is not None)
161             assert(priority is not None)
162             self.page_queue.remove((page, priority))
163             return (page, triggered)
164
165         # Always show the clock in the middle of the night.
166         now = datetime_utils.now_pacific()
167         if now.hour < 6:
168             for page in self.pages:
169                 if "clock" in page:
170                     return (page, False)
171
172         # Fall back on weighted random choice.
173         return (weighted_random_chooser.choose_next_page(self), False)
174
175
176 # Test
177 # def filter_news_during_dinnertime(page):
178 #    now = datetime.datetime.now()
179 #    is_dinnertime = now.hour >= 17 and now.hour <= 20
180 #    return not is_dinnertime or not (
181 #        "cnn" in page
182 #        or "news" in page
183 #        or "mynorthwest" in page
184 #        or "seattle" in page
185 #        or "stranger" in page
186 #        or "twitter" in page
187 #        or "wsj" in page
188 #    )
189 # x = weighted_random_chooser_with_triggers([], [ filter_news_during_dinnertime ])
190 # print(x.choose_next_page())