3 """A caching layer around the kernel's network mapping between IPs and MACs"""
8 from typing import Any, Optional
10 from overrides import overrides
13 from collect.bidict import BiDict
21 logger = logging.getLogger(__name__)
23 cfg = config.add_commandline_args(
24 f'MAC <--> IP Address mapping table cache ({__file__})',
25 'Commandline args related to MAC <--> IP Address mapping',
28 '--arper_cache_location',
29 default=f'{os.environ["HOME"]}/cache/.arp_table_cache',
31 help='Where to cache the kernel ARP table',
34 '--arper_cache_max_staleness',
35 type=argparse_utils.valid_duration,
36 default=datetime.timedelta(seconds=60 * 60),
38 help='Max acceptable age of the kernel arp table cache'
41 '--arper_min_entries_to_be_valid',
43 default=site_config.get_config().arper_minimum_device_count,
44 help='Min number of arp entries to bother persisting.'
48 @persistent.persistent_autoloaded_singleton()
49 class Arper(persistent.Persistent):
51 self, cached_state: Optional[BiDict[str, str]] = None
54 if cached_state is not None:
55 logger.debug('Loading Arper map from cached state.')
56 self.state = cached_state
58 logger.debug('No usable cached state; calling /usr/sbin/arp')
62 output = exec_utils.cmd(
66 for line in output.split('\n'):
67 ip = string_utils.extract_ip_v4(line)
68 mac = string_utils.extract_mac_address(line)
69 if ip is not None and mac is not None:
71 logger.debug(f' {mac} => {ip}')
74 def get_ip_by_mac(self, mac: str) -> Optional[str]:
76 return self.state.get(mac, None)
78 def get_mac_by_ip(self, ip: str) -> Optional[str]:
79 return self.state.inverse.get(ip, None)
82 def save(self) -> bool:
83 if len(self.state) > config.config['arper_min_entries_to_be_valid']:
85 f'Persisting state to {config.config["arper_cache_location"]}'
87 with file_utils.FileWriter(config.config['arper_cache_location']) as wf:
88 for (mac, ip) in self.state.items():
90 print(f'{mac}, {ip}', file=wf)
94 f'Only saw {len(self.state)} entries; needed at least {config.config["arper_min_entries_to_be_valid"]} to bother persisting.'
100 def load(cls) -> Any:
101 cache_file = config.config['arper_cache_location']
102 if persistent.was_file_written_within_n_seconds(
104 config.config['arper_cache_max_staleness'].total_seconds(),
106 logger.debug(f'Loading state from {cache_file}')
107 cached_state = BiDict()
108 with open(cache_file, 'r') as rf:
109 contents = rf.readlines()
110 for line in contents:
112 logger.debug(f'ARPER:{cache_file}> {line}')
113 (mac, ip) = line.split(',')
117 cached_state[mac] = ip
118 if len(cached_state) > config.config['arper_min_entries_to_be_valid']:
119 return cls(cached_state)
122 f'{cache_file} sucks, only {len(cached_state)} entries. Deleting it.'
124 os.remove(cache_file)
126 logger.debug('No usable saved state found')