projects
/
kiosk.git
/ blobdiff
commit
grep
author
committer
pickaxe
?
search:
re
summary
|
shortlog
|
log
|
commit
|
commitdiff
|
tree
raw
|
inline
| side by side
More cleanup.
[kiosk.git]
/
chooser.py
diff --git
a/chooser.py
b/chooser.py
index 3fecc09717ff47c43988d76ac167b79cc2e4ef3d..3514c976cb7c7149f539cefd25952d91fa97bf30 100644
(file)
--- a/
chooser.py
+++ b/
chooser.py
@@
-1,22
+1,28
@@
#!/usr/bin/env python3
from abc import ABC, abstractmethod
#!/usr/bin/env python3
from abc import ABC, abstractmethod
-import datetime
-import glob
+import logging
import os
import random
import re
import os
import random
import re
-import sys
import time
from typing import Any, Callable, List, Optional, Set, Tuple
import time
from typing import Any, Callable, List, Optional, Set, Tuple
+import datetime_utils
+
import constants
import trigger
import constants
import trigger
+logger = logging.getLogger(__file__)
+
+
class chooser(ABC):
"""Base class of a thing that chooses pages"""
class chooser(ABC):
"""Base class of a thing that chooses pages"""
+ def __init__(self):
+ pass
+
def get_page_list(self) -> List[str]:
now = time.time()
valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
def get_page_list(self) -> List[str]:
now = time.time()
valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
@@
-29,7
+35,6
@@
class chooser(ABC):
for page in pages:
result = re.match(valid_filename, page)
if result is not None:
for page in pages:
result = re.match(valid_filename, page)
if result is not None:
- print(f'chooser: candidate page: "{page}"')
if result.group(3) != "none":
freshness_requirement = int(result.group(3))
last_modified = int(
if result.group(3) != "none":
freshness_requirement = int(result.group(3))
last_modified = int(
@@
-37,8
+42,13
@@
class chooser(ABC):
)
age = now - last_modified
if age > freshness_requirement:
)
age = now - last_modified
if age > freshness_requirement:
- print(f'chooser: "{page}" is too old.')
+ logger.warning(
+ f'chooser: "{page}" is too old.'
+ )
continue
continue
+ logger.info(
+ f'chooser: candidate page: "{page}"'
+ )
filenames.append(page)
return filenames
filenames.append(page)
return filenames
@@
-51,7
+61,8
@@
class weighted_random_chooser(chooser):
"""Chooser that does it via weighted RNG."""
def __init__(self, filter_list: Optional[List[Callable[[str], bool]]]) -> None:
"""Chooser that does it via weighted RNG."""
def __init__(self, filter_list: Optional[List[Callable[[str], bool]]]) -> None:
- self.last_choice = ""
+ super().__init__()
+ self.last_choice = None
self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
self.pages: Optional[List[str]] = None
self.count = 0
self.valid_filename = re.compile("([^_]+)_(\d+)_([^\.]+)\.html")
self.pages: Optional[List[str]] = None
self.count = 0
@@
-61,13
+72,14
@@
class weighted_random_chooser(chooser):
self.filter_list.append(self.dont_choose_page_twice_in_a_row_filter)
def dont_choose_page_twice_in_a_row_filter(self, choice: str) -> bool:
self.filter_list.append(self.dont_choose_page_twice_in_a_row_filter)
def dont_choose_page_twice_in_a_row_filter(self, choice: str) -> bool:
- if choice == self.last_choice:
+ if
self.last_choice is not None and
choice == self.last_choice:
return False
self.last_choice = choice
return True
def choose_next_page(self) -> Any:
if self.pages is None or self.count % 100 == 0:
return False
self.last_choice = choice
return True
def choose_next_page(self) -> Any:
if self.pages is None or self.count % 100 == 0:
+ logger.info('chooser: refreshing the candidate pages list.')
self.pages = self.get_page_list()
total_weight = 0
self.pages = self.get_page_list()
total_weight = 0
@@
-90,11
+102,10
@@
class weighted_random_chooser(chooser):
break
choice = self.pages[x]
break
choice = self.pages[x]
- # Allow filter list to suppress pages.
+ # Allow filter
s
list to suppress pages.
choice_is_filtered = False
for f in self.filter_list:
if not f(choice):
choice_is_filtered = False
for f in self.filter_list:
if not f(choice):
- print(f"chooser: {choice} filtered by {f.__name__}")
choice_is_filtered = True
break
if choice_is_filtered:
choice_is_filtered = True
break
if choice_is_filtered:
@@
-113,7
+124,7
@@
class weighted_random_chooser_with_triggers(weighted_random_chooser):
trigger_list: Optional[List[trigger.trigger]],
filter_list: List[Callable[[str], bool]],
) -> None:
trigger_list: Optional[List[trigger.trigger]],
filter_list: List[Callable[[str], bool]],
) -> None:
-
weighted_random_chooser.__init__(self,
filter_list)
+
super().__init__(
filter_list)
self.trigger_list: List[trigger.trigger] = []
if trigger_list is not None:
self.trigger_list.extend(trigger_list)
self.trigger_list: List[trigger.trigger] = []
if trigger_list is not None:
self.trigger_list.extend(trigger_list)
@@
-126,19
+137,20
@@
class weighted_random_chooser_with_triggers(weighted_random_chooser):
if x is not None and len(x) > 0:
for y in x:
self.page_queue.add(y)
if x is not None and len(x) > 0:
for y in x:
self.page_queue.add(y)
+ logger.info(f'chooser: noticed active trigger {y}')
triggered = True
return triggered
def choose_next_page(self) -> Tuple[str, bool]:
if self.pages is None or self.count % 100 == 0:
triggered = True
return triggered
def choose_next_page(self) -> Tuple[str, bool]:
if self.pages is None or self.count % 100 == 0:
+ logger.info('chooser: refreshing the candidates page list')
self.pages = self.get_page_list()
triggered = self.check_for_triggers()
# First try to satisfy from the page queue.
self.pages = self.get_page_list()
triggered = self.check_for_triggers()
# First try to satisfy from the page queue.
- now = datetime.datetime.now()
if len(self.page_queue) > 0:
if len(self.page_queue) > 0:
-
print("chooser: Pulling page from queue..."
)
+
logger.info('chooser: page queue has entries; pulling choice from there.'
)
page = None
priority = None
for t in self.page_queue:
page = None
priority = None
for t in self.page_queue:
@@
-151,14
+163,14
@@
class weighted_random_chooser_with_triggers(weighted_random_chooser):
return (page, triggered)
# Always show the clock in the middle of the night.
return (page, triggered)
# Always show the clock in the middle of the night.
- elif now.hour < 7:
+ now = datetime_utils.now_pacific()
+ if now.hour < 6:
for page in self.pages:
if "clock" in page:
return (page, False)
# Fall back on weighted random choice.
for page in self.pages:
if "clock" in page:
return (page, False)
# Fall back on weighted random choice.
- else:
- return (weighted_random_chooser.choose_next_page(self), False)
+ return (weighted_random_chooser.choose_next_page(self), False)
# Test
# Test