d74b659b857beb74c12057a30e07ea57955a2b7a
[python_utils.git] / arper.py
1 #!/usr/bin/env python3
2
3 import datetime
4 import logging
5 import os
6 from typing import Any, Optional
7
8 import argparse_utils
9 from collect.bidict import BiDict
10 import config
11 import exec_utils
12 import persistent
13 import string_utils
14
15 logger = logging.getLogger(__name__)
16
17 cfg = config.add_commandline_args(
18     f'MAC <--> IP Address mapping table cache ({__file__})',
19     'Commandline args related to MAC <--> IP Address mapping',
20 )
21 cfg.add_argument(
22     '--arper_cache_location',
23     default=f'{os.environ["HOME"]}/cache/.arp_table_cache',
24     metavar='FILENAME',
25     help='Where to cache the kernel ARP table',
26 )
27 cfg.add_argument(
28     '--arper_cache_max_staleness',
29     type=argparse_utils.valid_duration,
30     default=datetime.timedelta(seconds=60 * 60),
31     metavar='DURATION',
32     help='Max acceptable age of the kernel arp table cache'
33 )
34
35
36 @persistent.persistent_autoloaded_singleton()
37 class Arper(persistent.Persistent):
38     def __init__(
39             self, cached_state: Optional[BiDict[str, str]] = None
40     ) -> None:
41         self.state = BiDict()
42         if cached_state is not None:
43             logger.debug('Loading Arper map from cached state.')
44             self.state = cached_state
45         else:
46             logger.debug('No usable cached state; calling /usr/sbin/arp')
47             output = exec_utils.cmd(
48                 '/usr/sbin/arp -a',
49                 timeout_seconds=5.0
50             )
51             for line in output.split('\n'):
52                 ip = string_utils.extract_ip_v4(line)
53                 mac = string_utils.extract_mac_address(line)
54                 if ip is not None and mac is not None:
55                     mac = mac.lower()
56                     logger.debug(f'    {mac} => {ip}')
57                     self.state[mac] = ip
58
59     def get_ip_by_mac(self, mac: str) -> Optional[str]:
60         mac = mac.lower()
61         return self.state.get(mac, None)
62
63     def get_mac_by_ip(self, ip: str) -> Optional[str]:
64         return self.state.inverse.get(ip, None)
65
66     def save(self) -> bool:
67         logger.debug(
68             f'Persisting state to {config.config["arper_cache_location"]}'
69         )
70         with open(config.config['arper_cache_location'], 'w') as wf:
71             for (mac, ip) in self.state.items():
72                 mac = mac.lower()
73                 print(f'{mac}, {ip}', file=wf)
74
75     @classmethod
76     def load(cls) -> Any:
77         if persistent.was_file_written_within_n_seconds(
78                 config.config['arper_cache_location'],
79                 config.config['arper_cache_max_staleness'].total_seconds(),
80         ):
81             logger.debug(
82                 f'Loading state from {config.config["arper_cache_location"]}'
83             )
84             cached_state = BiDict()
85             with open(config.config['arper_cache_location'], 'r') as rf:
86                 contents = rf.readlines()
87                 for line in contents:
88                     logger.debug(f'ARPER> {line}')
89                     (mac, ip) = line.split(',')
90                     mac = mac.strip()
91                     mac = mac.lower()
92                     ip = ip.strip()
93                     cached_state[mac] = ip
94             return cls(cached_state)
95         logger.debug('No usable saved state found')
96         return None