Towards mypy cleanliness.
[kiosk.git] / camera_trigger.py
1 #!/usr/bin/env python3
2
3 from datetime import datetime
4 import glob
5 import os
6 import time
7 from typing import List, Tuple
8
9 import trigger
10 import utils
11
12
13 class any_camera_trigger(trigger.trigger):
14     def __init__(self):
15         self.triggers_in_the_past_seven_min = {
16             "driveway": 0,
17             "frontdoor": 0,
18             "cabin_driveway": 0,
19         }
20         self.last_trigger_timestamp = {
21             "driveway": 0,
22             "frontdoor": 0,
23             "cabin_driveway": 0,
24         }
25
26     def choose_priority(self, camera: str, age: int) -> int:
27         """Based on the camera name and last trigger age, compute priority."""
28         base_priority_by_camera = {
29             "driveway": 1,
30             "frontdoor": 2,
31             "cabin_driveway": 1,
32         }
33         priority = base_priority_by_camera[camera]
34         if age < 10:
35             priority += trigger.trigger.PRIORITY_HIGH
36         elif age < 30:
37             priority += trigger.trigger.PRIORITY_NORMAL + age
38         else:
39             priority += trigger.trigger.PRIORITY_LOW
40         return priority
41
42     def get_triggered_page_list(self) -> List[Tuple[str, int]]:
43         """Return a list of triggered pages with priorities."""
44         triggers = []
45         num_cameras_with_recent_triggers = 0
46         camera_list = ["driveway", "frontdoor", "cabin_driveway"]
47
48         now = time.time()
49         try:
50             # First pass, just see whether each camera is triggered
51             # and, if so, count how many times in the past 7m it has
52             # been triggered.
53             for camera in camera_list:
54                 filename = f"/timestamps/last_camera_motion_{camera}"
55                 ts = os.stat(filename).st_ctime
56                 if ts != self.last_trigger_timestamp[camera] and (now - ts) < 10:
57                     print("Camera: %s, age %s" % (camera, (now - ts)))
58                     self.last_trigger_timestamp[camera] = ts
59                     num_cameras_with_recent_triggers += 1
60                     self.triggers_in_the_past_seven_min[camera] = 0
61                     filename = f"/timestamps/camera_motion_history_{camera}"
62                     with open(filename, "r") as f:
63                         contents = f.readlines()
64                     for x in contents:
65                         x.strip()
66                         age = now - int(x)
67                         if age < (60 * 7):
68                             self.triggers_in_the_past_seven_min[camera] += 1
69
70             # Second pass, see whether we want to trigger due to
71             # camera activity we found.  All cameras timestamps were
72             # just considered and should be up-to-date.  Some logic to
73             # squelch spammy cameras unless more than one is triggered
74             # at the same time.
75             for camera in camera_list:
76                 if (now - self.last_trigger_timestamp[camera]) < 10:
77                     if (
78                         self.triggers_in_the_past_seven_min[camera] <= 4
79                         or num_cameras_with_recent_triggers > 1
80                     ):
81                         ts = utils.timestamp()
82                         priority = self.choose_priority(camera, age)
83                         print(
84                             f"{ts}: ****** {camera}[{priority}] CAMERA TRIGGER ******"
85                         )
86                         triggers.append(
87                             (
88                                 f"hidden/{camera}.html",
89                                 priority,
90                             )
91                         )
92                     else:
93                         print(f"{ts}: Camera {camera} too spammy, squelching it")
94         except Exception as e:
95             print(e)
96             pass
97
98         if len(triggers) == 0:
99             return None
100         else:
101             return triggers
102
103
104 # x = any_camera_trigger()
105 # print(x.get_triggered_page_list())