Convert to use pyutilz / scottutilz libraries.
[kiosk.git] / camera_trigger.py
1 #!/usr/bin/env python3
2
3 import logging
4 import os
5 import time
6 from typing import List, Tuple, Optional
7
8 import trigger
9
10
11 logger = logging.getLogger(__file__)
12
13
14 class any_camera_trigger(trigger.trigger):
15     def __init__(self):
16         self.triggers_in_the_past_seven_min = {
17             "driveway": 0,
18             "frontdoor": 0,
19             "doorbell": 0,
20             "cabin_driveway": 0,
21         }
22         self.last_trigger_timestamp = {
23             "driveway": 0,
24             "frontdoor": 0,
25             "doorbell": 0,
26             "cabin_driveway": 0,
27         }
28
29     def choose_priority(self, camera: str, age: int) -> int:
30         """Based on the camera name and last trigger age, compute priority."""
31         base_priority_by_camera = {
32             "driveway": 3,
33             "frontdoor": 2,
34             "doorbell": 1,
35             "cabin_driveway": 3,
36         }
37         priority = base_priority_by_camera[camera]
38         if age < 10:
39             priority += trigger.trigger.PRIORITY_HIGH
40         elif age < 30:
41             priority += trigger.trigger.PRIORITY_NORMAL + age
42         else:
43             priority += trigger.trigger.PRIORITY_LOW
44         return priority
45
46     def get_triggered_page_list(self) -> Optional[List[Tuple[str, int]]]:
47         """Return a list of triggered pages with priorities."""
48         triggers = []
49         num_cameras_with_recent_triggers = 0
50         camera_list = ["driveway", "frontdoor", "doorbell", "cabin_driveway"]
51
52         now = time.time()
53         try:
54             # First pass, just see whether each camera is triggered
55             # and, if so, count how many times in the past 7m it has
56             # been triggered.
57             for camera in camera_list:
58                 filename = f"/timestamps/last_camera_motion_{camera}"
59                 ts = os.stat(filename).st_ctime
60                 age = now - ts
61                 print(f'{camera} => {age}')
62                 if ts != self.last_trigger_timestamp[camera]:
63                     self.last_trigger_timestamp[camera] = ts
64                     if age < 15:
65                         logger.info(f'{camera} is triggered; {filename} touched {age}s ago (@{ts}')
66                         num_cameras_with_recent_triggers += 1
67
68                         self.triggers_in_the_past_seven_min[camera] = 0
69                         filename = f"/timestamps/camera_motion_history_{camera}"
70                         with open(filename, "r") as f:
71                             contents = f.readlines()
72                         for x in contents:
73                             x = x.strip()
74                             age = now - int(x)
75                             if age < (60 * 7):
76                                 self.triggers_in_the_past_seven_min[camera] += 1
77                                 print(f'{camera} past 7m: {self.triggers_in_the_past_seven_min[camera]}')
78
79             # Second pass, see whether we want to trigger due to
80             # camera activity we found.  All cameras timestamps were
81             # just considered and should be up-to-date.  Some logic to
82             # squelch spammy cameras unless more than one is triggered
83             # at the same time.
84             print(f'{num_cameras_with_recent_triggers}')
85             for camera in camera_list:
86                 if (now - self.last_trigger_timestamp[camera]) < 15:
87                     if (
88                         self.triggers_in_the_past_seven_min[camera] <= 4
89                         or num_cameras_with_recent_triggers > 1
90                     ):
91                         print(f'{camera} has {self.triggers_in_the_past_seven_min[camera]} triggers in the past 7d.')
92                         print(f'{num_cameras_with_recent_triggers} cameras are triggered right now.')
93
94                         age = now - self.last_trigger_timestamp[camera]
95                         priority = self.choose_priority(camera, int(age))
96                         print(f'*** CAMERA TRIGGER (hidden/{camera}.html @ {priority}) ***')
97                         triggers.append(
98                             (
99                                 f"hidden/unwrapped_{camera}.html",
100                                 priority,
101                             )
102                         )
103                     else:
104                         logger.info(f'{camera} is too spammy; {self.triggers_in_the_past_seven_min[camera]} events in the past 7m.  Ignoring it.')
105         except Exception as e:
106             logger.exception(e)
107
108         if len(triggers) == 0:
109             return None
110         else:
111             logger.info('There are active camera triggers!')
112             return triggers
113
114
115 # x = any_camera_trigger()
116 # print(x.get_triggered_page_list())