Add invw.org
[kiosk.git] / chooser.py
1 #!/usr/bin/env python3
2
3 from abc import ABC, abstractmethod
4 import logging
5 import os
6 import random
7 import re
8 import time
9 from typing import Any, Callable, List, Optional, Set, Tuple
10
11 from pyutils import logging_utils
12 from pyutils.datetimes import datetime_utils
13
14 import kiosk_constants
15 import trigger
16
17
18 logger = logging.getLogger(__name__)
19
20
21 class chooser(ABC):
22     """Base class of a thing that chooses pages"""
23
24     def __init__(self):
25         pass
26
27     @logging_utils.LoggingContext(logger, prefix="chooser:")
28     def get_page_list(self) -> List[str]:
29         now = time.time()
30         valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
31         filenames = []
32         pages = [
33             f
34             for f in os.listdir(kiosk_constants.pages_dir)
35             if os.path.isfile(os.path.join(kiosk_constants.pages_dir, f))
36         ]
37         for page in pages:
38             result = re.match(valid_filename, page)
39             if result is not None:
40                 if result.group(3) != "none":
41                     freshness_requirement = int(result.group(3))
42                     last_modified = int(
43                         os.path.getmtime(os.path.join(kiosk_constants.pages_dir, page))
44                     )
45                     age = now - last_modified
46                     if age > freshness_requirement:
47                         logger.warning(f'"{page}" is too old.')
48                         continue
49                 logger.info(f'candidate page: "{page}"')
50                 filenames.append(page)
51         return filenames
52
53     @abstractmethod
54     def choose_next_page(self) -> Any:
55         pass
56
57
58 class weighted_random_chooser(chooser):
59     """Chooser that does it via weighted RNG."""
60
61     def __init__(self, filter_list: Optional[List[Callable[[str], bool]]]) -> None:
62         super().__init__()
63         self.last_choice = None
64         self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
65         self.pages: Optional[List[str]] = None
66         self.count = 0
67         self.filter_list: List[Callable[[str], bool]] = []
68         if filter_list is not None:
69             self.filter_list.extend(filter_list)
70
71     @logging_utils.LoggingContext(logger, prefix="chooser:")
72     def choose_next_page(self) -> Any:
73         if self.pages is None or self.count % 100 == 0:
74             logger.info("refreshing the candidate pages list.")
75             self.pages = self.get_page_list()
76
77         total_weight = 0
78         weights = []
79         for page in self.pages:
80             result = re.match(self.valid_filename, page)
81             if result is not None:
82                 weight = int(result.group(2))
83                 weights.append(weight)
84                 total_weight += weight
85         if total_weight <= 0:
86             raise Exception
87
88         while True:
89             random_pick = random.randrange(0, total_weight - 1)
90             so_far = 0
91             for x in range(0, len(weights)):
92                 so_far += weights[x]
93                 if so_far > random_pick:
94                     break
95             choice = self.pages[x]
96
97             # Allow filters list to suppress pages.
98             choice_is_filtered = False
99             for f in self.filter_list:
100                 if not f(choice):
101                     choice_is_filtered = True
102                     break
103             if choice_is_filtered:
104                 continue
105
106             # We're good...
107             self.count += 1
108             return choice
109
110
111 class weighted_random_chooser_with_triggers(weighted_random_chooser):
112     """Same as WRC but has trigger events"""
113
114     def __init__(
115         self,
116         trigger_list: Optional[List[trigger.trigger]],
117         filter_list: List[Callable[[str], bool]],
118     ) -> None:
119         super().__init__(filter_list)
120         self.trigger_list: List[trigger.trigger] = []
121         if trigger_list is not None:
122             self.trigger_list.extend(trigger_list)
123         self.page_queue: Set[Tuple[str, int]] = set(())
124
125     @logging_utils.LoggingContext(logger, prefix="chooser:")
126     def check_for_triggers(self) -> bool:
127         triggered = False
128         for t in self.trigger_list:
129             x = t.get_triggered_page_list()
130             if x is not None and len(x) > 0:
131                 for y in x:
132                     self.page_queue.add(y)
133                     logger.info(f"noticed active trigger {y}")
134                     triggered = True
135         return triggered
136
137     @logging_utils.LoggingContext(logger, prefix="chooser:")
138     def choose_next_page(self) -> Tuple[str, bool]:
139         if self.pages is None or self.count % 100 == 0:
140             logger.info("refreshing the candidates page list")
141             self.pages = self.get_page_list()
142
143         triggered = self.check_for_triggers()
144
145         # First try to satisfy from the page queue.
146         if len(self.page_queue) > 0:
147             logger.info("page queue has entries; pulling choice from there.")
148             page = None
149             priority = None
150             for t in self.page_queue:
151                 if priority is None or t[1] > priority:
152                     page = t[0]
153                     priority = t[1]
154             assert page is not None
155             assert priority is not None
156             self.page_queue.remove((page, priority))
157             return (page, triggered)
158
159         # Always show the clock in the middle of the night.
160         now = datetime_utils.now_pacific()
161         if now.hour < 6:
162             for page in self.pages:
163                 if "clock" in page:
164                     return (page, False)
165
166         # Fall back on weighted random choice.
167         return (weighted_random_chooser.choose_next_page(self), False)
168
169
170 # Test
171 # def filter_news_during_dinnertime(page):
172 #    now = datetime.datetime.now()
173 #    is_dinnertime = now.hour >= 17 and now.hour <= 20
174 #    return not is_dinnertime or not (
175 #        "cnn" in page
176 #        or "news" in page
177 #        or "mynorthwest" in page
178 #        or "seattle" in page
179 #        or "stranger" in page
180 #        or "twitter" in page
181 #        or "wsj" in page
182 #    )
183 # x = weighted_random_chooser_with_triggers([], [ filter_news_during_dinnertime ])
184 # print(x.choose_next_page())