More changes related to running on new kiosk.house.
[kiosk.git] / camera_trigger.py
1 #!/usr/bin/env python3
2
3 from datetime import datetime
4 import glob
5 import os
6 import time
7 from typing import List, Tuple, Optional
8
9 import trigger
10 import utils
11
12
13 class any_camera_trigger(trigger.trigger):
14     def __init__(self):
15         self.triggers_in_the_past_seven_min = {
16             "driveway": 0,
17             "frontdoor": 0,
18             "doorbell": 0,
19             "cabin_driveway": 0,
20         }
21         self.last_trigger_timestamp = {
22             "driveway": 0,
23             "frontdoor": 0,
24             "doorbell": 0,
25             "cabin_driveway": 0,
26         }
27
28     def choose_priority(self, camera: str, age: int) -> int:
29         """Based on the camera name and last trigger age, compute priority."""
30         base_priority_by_camera = {
31             "driveway": 3,
32             "frontdoor": 2,
33             "doorbell": 1,
34             "cabin_driveway": 3,
35         }
36         priority = base_priority_by_camera[camera]
37         if age < 10:
38             priority += trigger.trigger.PRIORITY_HIGH
39         elif age < 30:
40             priority += trigger.trigger.PRIORITY_NORMAL + age
41         else:
42             priority += trigger.trigger.PRIORITY_LOW
43         return priority
44
45     def get_triggered_page_list(self) -> Optional[List[Tuple[str, int]]]:
46         """Return a list of triggered pages with priorities."""
47         triggers = []
48         num_cameras_with_recent_triggers = 0
49         camera_list = ["driveway", "frontdoor", "doorbell", "cabin_driveway"]
50
51         now = time.time()
52         try:
53             # First pass, just see whether each camera is triggered
54             # and, if so, count how many times in the past 7m it has
55             # been triggered.
56             for camera in camera_list:
57                 filename = f"/timestamps/last_camera_motion_{camera}"
58                 ts = os.stat(filename).st_ctime
59                 if ts != self.last_trigger_timestamp[camera] and (now - ts) < 10:
60                     print("Camera: %s, age %s" % (camera, (now - ts)))
61                     self.last_trigger_timestamp[camera] = ts
62                     num_cameras_with_recent_triggers += 1
63                     self.triggers_in_the_past_seven_min[camera] = 0
64                     filename = f"/timestamps/camera_motion_history_{camera}"
65                     with open(filename, "r") as f:
66                         contents = f.readlines()
67                     for x in contents:
68                         x.strip()
69                         age = now - int(x)
70                         if age < (60 * 7):
71                             self.triggers_in_the_past_seven_min[camera] += 1
72
73             # Second pass, see whether we want to trigger due to
74             # camera activity we found.  All cameras timestamps were
75             # just considered and should be up-to-date.  Some logic to
76             # squelch spammy cameras unless more than one is triggered
77             # at the same time.
78             for camera in camera_list:
79                 if (now - self.last_trigger_timestamp[camera]) < 10:
80                     if (
81                         self.triggers_in_the_past_seven_min[camera] <= 4
82                         or num_cameras_with_recent_triggers > 1
83                     ):
84                         priority = self.choose_priority(camera, int(age))
85                         print(
86                             f"{utils.timestamp()}: *** {camera}[{priority}] CAMERA TRIGGER ***"
87                         )
88                         triggers.append(
89                             (
90                                 f"hidden/{camera}.html",
91                                 priority,
92                             )
93                         )
94                     else:
95                         print(f"{utils.timestamp()}: Camera {camera} too spammy, squelching it")
96         except Exception as e:
97             print(e)
98             pass
99
100         if len(triggers) == 0:
101             return None
102         else:
103             return triggers
104
105
106 # x = any_camera_trigger()
107 # print(x.get_triggered_page_list())