help='Where to cache the kernel ARP table',
)
cfg.add_argument(
- '--arp_cache_max_staleness',
+ '--arper_cache_max_staleness',
type=argparse_utils.valid_duration,
- default=datetime.timedelta(seconds=60 * 5),
+ default=datetime.timedelta(seconds=60 * 60),
metavar='DURATION',
help='Max acceptable age of the kernel arp table cache'
)
@persistent.persistent_autoloaded_singleton()
class Arper(persistent.Persistent):
- def __init__(self, cached_state: Optional[BiDict[str, str]] = None) -> None:
+ def __init__(
+ self, cached_state: Optional[BiDict[str, str]] = None
+ ) -> None:
self.state = BiDict()
if cached_state is not None:
logger.debug('Loading Arper map from cached state.')
timeout_seconds=5.0
)
for line in output.split('\n'):
- line = str(line, 'ascii')
ip = string_utils.extract_ip_v4(line)
mac = string_utils.extract_mac_address(line)
- mac = mac.lower()
- logger.debug(f' {mac} => {ip}')
- self.state[mac] = ip
+ if ip is not None and mac is not None:
+ mac = mac.lower()
+ logger.debug(f' {mac} => {ip}')
+ self.state[mac] = ip
def get_ip_by_mac(self, mac: str) -> Optional[str]:
mac = mac.lower()
return self.state.inverse.get(ip, None)
def save(self) -> bool:
- logger.debug(f'Persisting state to {config.config["arp_cache_location"]}')
- with open(config.config['arp_cache_location'], 'w') as wf:
+ logger.debug(
+ f'Persisting state to {config.config["arper_cache_location"]}'
+ )
+ with open(config.config['arper_cache_location'], 'w') as wf:
for (mac, ip) in self.state.items():
mac = mac.lower()
print(f'{mac}, {ip}', file=wf)
@classmethod
def load(cls) -> Any:
if persistent.was_file_written_within_n_seconds(
- config.config['arp_cache_location'],
- config.config['arp_cache_max_staleness'].total_seconds(),
+ config.config['arper_cache_location'],
+ config.config['arper_cache_max_staleness'].total_seconds(),
):
- logger.debug(f'Loading state from {config.config["arp_cache_location"]}')
+ logger.debug(
+ f'Loading state from {config.config["arper_cache_location"]}'
+ )
cached_state = BiDict()
- with open(config.config['arp_cache_location'], 'r') as rf:
- for line in rd.readline():
+ with open(config.config['arper_cache_location'], 'r') as rf:
+ contents = rf.readlines()
+ for line in contents:
+ logger.debug(f'ARPER> {line}')
(mac, ip) = line.split(',')
mac = mac.strip()
mac = mac.lower()
ip = ip.strip()
cached_state[mac] = ip
- return Arper(cached_state)
+ return cls(cached_state)
logger.debug('No usable saved state found')
return None