6 from typing import Any, Optional
9 from collect.bidict import BiDict
15 logger = logging.getLogger(__name__)
17 cfg = config.add_commandline_args(
18 f'MAC <--> IP Address mapping table cache ({__file__})',
19 'Commandline args related to MAC <--> IP Address mapping',
22 '--arper_cache_location',
23 default=f'{os.environ["HOME"]}/cache/.arp_table_cache',
25 help='Where to cache the kernel ARP table',
28 '--arper_cache_max_staleness',
29 type=argparse_utils.valid_duration,
30 default=datetime.timedelta(seconds=60 * 60),
32 help='Max acceptable age of the kernel arp table cache'
36 @persistent.persistent_autoloaded_singleton()
37 class Arper(persistent.Persistent):
39 self, cached_state: Optional[BiDict[str, str]] = None
42 if cached_state is not None:
43 logger.debug('Loading Arper map from cached state.')
44 self.state = cached_state
46 logger.debug('No usable cached state; calling /usr/sbin/arp')
47 output = exec_utils.cmd(
51 for line in output.split('\n'):
52 ip = string_utils.extract_ip_v4(line)
53 mac = string_utils.extract_mac_address(line)
54 if ip is not None and mac is not None:
56 logger.debug(f' {mac} => {ip}')
59 def get_ip_by_mac(self, mac: str) -> Optional[str]:
61 return self.state.get(mac, None)
63 def get_mac_by_ip(self, ip: str) -> Optional[str]:
64 return self.state.inverse.get(ip, None)
66 def save(self) -> bool:
68 f'Persisting state to {config.config["arper_cache_location"]}'
70 with open(config.config['arper_cache_location'], 'w') as wf:
71 for (mac, ip) in self.state.items():
73 print(f'{mac}, {ip}', file=wf)
77 if persistent.was_file_written_within_n_seconds(
78 config.config['arper_cache_location'],
79 config.config['arper_cache_max_staleness'].total_seconds(),
82 f'Loading state from {config.config["arper_cache_location"]}'
84 cached_state = BiDict()
85 with open(config.config['arper_cache_location'], 'r') as rf:
86 contents = rf.readlines()
88 logger.debug(f'ARPER> {line}')
89 (mac, ip) = line.split(',')
93 cached_state[mac] = ip
94 return cls(cached_state)
95 logger.debug('No usable saved state found')