Merge branch 'master' of ssh://git.house:/usr/local/git/base/kiosk
[kiosk.git] / camera_trigger.py
1 #!/usr/bin/env python3
2
3 import logging
4 import os
5 import time
6 from typing import List, Tuple, Optional
7
8 import trigger
9
10
11 logger = logging.getLogger(__name__)
12
13
14 class any_camera_trigger(trigger.trigger):
15     def __init__(self):
16         self.triggers_in_the_past_seven_min = {
17             "driveway": 0,
18             "frontdoor": 0,
19             "doorbell": 0,
20             "cabin_driveway": 0,
21         }
22         self.last_trigger_timestamp = {
23             "driveway": 0,
24             "frontdoor": 0,
25             "doorbell": 0,
26             "cabin_driveway": 0,
27         }
28
29     def choose_priority(self, camera: str, age: int) -> int:
30         """Based on the camera name and last trigger age, compute priority."""
31         base_priority_by_camera = {
32             "driveway": 3,
33             "frontdoor": 2,
34             "doorbell": 1,
35             "cabin_driveway": 3,
36         }
37         priority = base_priority_by_camera[camera]
38         if age < 10:
39             priority += trigger.trigger.PRIORITY_HIGH
40         elif age < 30:
41             priority += trigger.trigger.PRIORITY_NORMAL + age
42         else:
43             priority += trigger.trigger.PRIORITY_LOW
44         return priority
45
46     def get_triggered_page_list(self) -> Optional[List[Tuple[str, int]]]:
47         """Return a list of triggered pages with priorities."""
48         triggers = []
49         num_cameras_with_recent_triggers = 0
50         camera_list = ["driveway", "frontdoor", "doorbell", "cabin_driveway"]
51
52         now = time.time()
53         try:
54             # First pass, just see whether each camera is triggered
55             # and, if so, count how many times in the past 7m it has
56             # been triggered.
57             for camera in camera_list:
58                 filename = f"/timestamps/last_camera_motion_{camera}"
59                 ts = os.stat(filename).st_ctime
60                 age = now - ts
61                 print(f"{camera} => {age}")
62                 if ts != self.last_trigger_timestamp[camera]:
63                     self.last_trigger_timestamp[camera] = ts
64                     if age < 15:
65                         logger.info(
66                             f"{camera} is triggered; {filename} touched {age}s ago (@{ts}"
67                         )
68                         num_cameras_with_recent_triggers += 1
69
70                         self.triggers_in_the_past_seven_min[camera] = 0
71                         filename = f"/timestamps/camera_motion_history_{camera}"
72                         with open(filename, "r") as f:
73                             contents = f.readlines()
74                         for x in contents:
75                             x = x.strip()
76                             age = now - int(x)
77                             if age < (60 * 7):
78                                 self.triggers_in_the_past_seven_min[camera] += 1
79                                 print(
80                                     f"{camera} past 7m: {self.triggers_in_the_past_seven_min[camera]}"
81                                 )
82
83             # Second pass, see whether we want to trigger due to
84             # camera activity we found.  All cameras timestamps were
85             # just considered and should be up-to-date.  Some logic to
86             # squelch spammy cameras unless more than one is triggered
87             # at the same time.
88             print(f"{num_cameras_with_recent_triggers}")
89             for camera in camera_list:
90                 if (now - self.last_trigger_timestamp[camera]) < 15:
91                     if (
92                         self.triggers_in_the_past_seven_min[camera] <= 4
93                         or num_cameras_with_recent_triggers > 1
94                     ):
95                         print(
96                             f"{camera} has {self.triggers_in_the_past_seven_min[camera]} triggers in the past 7d."
97                         )
98                         print(
99                             f"{num_cameras_with_recent_triggers} cameras are triggered right now."
100                         )
101
102                         age = now - self.last_trigger_timestamp[camera]
103                         priority = self.choose_priority(camera, int(age))
104                         print(
105                             f"*** CAMERA TRIGGER (hidden/{camera}.html @ {priority}) ***"
106                         )
107                         triggers.append(
108                             (
109                                 f"hidden/unwrapped_{camera}.html",
110                                 priority,
111                             )
112                         )
113                     else:
114                         logger.info(
115                             f"{camera} is too spammy; {self.triggers_in_the_past_seven_min[camera]} events in the past 7m.  Ignoring it."
116                         )
117         except Exception:
118             logger.exception()
119
120         if len(triggers) == 0:
121             return None
122         else:
123             logger.info("There are active camera triggers!")
124             return triggers
125
126
127 # x = any_camera_trigger()
128 # print(x.get_triggered_page_list())