Fix cameras, improve weather, delegate health renderer to a helper,
[kiosk.git] / camera_trigger.py
1 #!/usr/bin/env python3
2
3 import logging
4 import os
5 import time
6 from typing import List, Tuple, Optional
7
8 import trigger
9
10
11 logger = logging.getLogger(__file__)
12
13
14 class any_camera_trigger(trigger.trigger):
15     def __init__(self):
16         self.triggers_in_the_past_seven_min = {
17             "driveway": 0,
18             "frontdoor": 0,
19             "doorbell": 0,
20             "cabin_driveway": 0,
21         }
22         self.last_trigger_timestamp = {
23             "driveway": 0,
24             "frontdoor": 0,
25             "doorbell": 0,
26             "cabin_driveway": 0,
27         }
28
29     def choose_priority(self, camera: str, age: int) -> int:
30         """Based on the camera name and last trigger age, compute priority."""
31         base_priority_by_camera = {
32             "driveway": 3,
33             "frontdoor": 2,
34             "doorbell": 1,
35             "cabin_driveway": 3,
36         }
37         priority = base_priority_by_camera[camera]
38         if age < 10:
39             priority += trigger.trigger.PRIORITY_HIGH
40         elif age < 30:
41             priority += trigger.trigger.PRIORITY_NORMAL + age
42         else:
43             priority += trigger.trigger.PRIORITY_LOW
44         return priority
45
46     def get_triggered_page_list(self) -> Optional[List[Tuple[str, int]]]:
47         """Return a list of triggered pages with priorities."""
48         triggers = []
49         num_cameras_with_recent_triggers = 0
50         camera_list = ["driveway", "frontdoor", "doorbell", "cabin_driveway"]
51
52         now = time.time()
53         try:
54             # First pass, just see whether each camera is triggered
55             # and, if so, count how many times in the past 7m it has
56             # been triggered.
57             for camera in camera_list:
58                 filename = f"/timestamps/last_camera_motion_{camera}"
59                 ts = os.stat(filename).st_ctime
60                 age = now - ts
61                 if ts != self.last_trigger_timestamp[camera] and age < 10:
62                     logger.info(f'{camera} is triggered; {filename} touched {age}s ago (@{ts}')
63                     self.last_trigger_timestamp[camera] = ts
64                     num_cameras_with_recent_triggers += 1
65
66                     self.triggers_in_the_past_seven_min[camera] = 0
67                     filename = f"/timestamps/camera_motion_history_{camera}"
68                     with open(filename, "r") as f:
69                         contents = f.readlines()
70                     for x in contents:
71                         x = x.strip()
72                         age = now - int(x)
73                         if age < (60 * 7):
74                             self.triggers_in_the_past_seven_min[camera] += 1
75
76             # Second pass, see whether we want to trigger due to
77             # camera activity we found.  All cameras timestamps were
78             # just considered and should be up-to-date.  Some logic to
79             # squelch spammy cameras unless more than one is triggered
80             # at the same time.
81             for camera in camera_list:
82                 if (now - self.last_trigger_timestamp[camera]) < 10:
83                     if (
84                         self.triggers_in_the_past_seven_min[camera] <= 4
85                         or num_cameras_with_recent_triggers > 1
86                     ):
87                         logger.info(f'{camera} has {self.triggers_in_the_past_seven_min[camera]} triggers in the past 7d.')
88                         logger.info(f'{num_cameras_with_recent_triggers} cameras are triggered right now.')
89
90                         age = now - self.last_trigger_timestamp[camera]
91                         priority = self.choose_priority(camera, int(age))
92                         logger.info(f'*** CAMERA TRIGGER (hidden/{camera}.html @ {priority}) ***')
93                         triggers.append(
94                             (
95                                 f"hidden/unwrapped_{camera}.html",
96                                 priority,
97                             )
98                         )
99                     else:
100                         logger.info(f'{camera} is too spammy; {self.triggers_in_the_past_seven_min[camera]} events in the past 7m.  Ignoring it.')
101         except Exception as e:
102             logger.exception(e)
103
104         if len(triggers) == 0:
105             return None
106         else:
107             logger.info('There are active camera triggers!')
108             return triggers
109
110
111 # x = any_camera_trigger()
112 # print(x.get_triggered_page_list())