73d1dd654bf1961bbe6bdc52ec8fd99d3d7db064
[python_utils.git] / arper.py
1 #!/usr/bin/env python3
2
3 import datetime
4 import logging
5 import os
6 from typing import Any, Optional
7
8 import argparse_utils
9 from collect.bidict import BiDict
10 import config
11 import exec_utils
12 import persistent
13 import string_utils
14
15 logger = logging.getLogger(__name__)
16
17 cfg = config.add_commandline_args(
18     f'MAC <--> IP Address mapping table cache ({__file__})',
19     'Commandline args related to MAC <--> IP Address mapping',
20 )
21 cfg.add_argument(
22     '--arper_cache_location',
23     default=f'{os.environ["HOME"]}/cache/.arp_table_cache',
24     metavar='FILENAME',
25     help='Where to cache the kernel ARP table',
26 )
27 cfg.add_argument(
28     '--arp_cache_max_staleness',
29     type=argparse_utils.valid_duration,
30     default=datetime.timedelta(seconds=60 * 5),
31     metavar='DURATION',
32     help='Max acceptable age of the kernel arp table cache'
33 )
34
35
36 @persistent.persistent_autoloaded_singleton()
37 class Arper(persistent.Persistent):
38     def __init__(self, cached_state: Optional[BiDict[str, str]] = None) -> None:
39         self.state = BiDict()
40         if cached_state is not None:
41             logger.debug('Loading Arper map from cached state.')
42             self.state = cached_state
43         else:
44             logger.debug('No usable cached state; calling /usr/sbin/arp')
45             output = exec_utils.cmd(
46                 '/usr/sbin/arp -a',
47                 timeout_seconds=5.0
48             )
49             for line in output.split('\n'):
50                 line = str(line, 'ascii')
51                 ip = string_utils.extract_ip_v4(line)
52                 mac = string_utils.extract_mac_address(line)
53                 mac = mac.lower()
54                 logger.debug(f'    {mac} => {ip}')
55                 self.state[mac] = ip
56
57     def get_ip_by_mac(self, mac: str) -> Optional[str]:
58         mac = mac.lower()
59         return self.state.get(mac, None)
60
61     def get_mac_by_ip(self, ip: str) -> Optional[str]:
62         return self.state.inverse.get(ip, None)
63
64     def save(self) -> bool:
65         logger.debug(f'Persisting state to {config.config["arp_cache_location"]}')
66         with open(config.config['arp_cache_location'], 'w') as wf:
67             for (mac, ip) in self.state.items():
68                 mac = mac.lower()
69                 print(f'{mac}, {ip}', file=wf)
70
71     @classmethod
72     def load(cls) -> Any:
73         if persistent.was_file_written_within_n_seconds(
74                 config.config['arp_cache_location'],
75                 config.config['arp_cache_max_staleness'].total_seconds(),
76         ):
77             logger.debug(f'Loading state from {config.config["arp_cache_location"]}')
78             cached_state = BiDict()
79             with open(config.config['arp_cache_location'], 'r') as rf:
80                 for line in rd.readline():
81                     (mac, ip) = line.split(',')
82                     mac = mac.strip()
83                     mac = mac.lower()
84                     ip = ip.strip()
85                     cached_state[mac] = ip
86             return Arper(cached_state)
87         logger.debug('No usable saved state found')
88         return None