#!/usr/bin/env python3 """A caching layer around the kernel's network mapping between IPs and MACs""" import datetime import logging import os from typing import Any, Optional import warnings from overrides import overrides import argparse_utils from collect.bidict import BiDict import config import exec_utils import file_utils import persistent import string_utils import site_config logger = logging.getLogger(__name__) cfg = config.add_commandline_args( f'MAC <--> IP Address mapping table cache ({__file__})', 'Commandline args related to MAC <--> IP Address mapping', ) cfg.add_argument( '--arper_cache_location', default=f'{os.environ["HOME"]}/cache/.arp_table_cache', metavar='FILENAME', help='Where to cache the kernel ARP table', ) cfg.add_argument( '--arper_cache_max_staleness', type=argparse_utils.valid_duration, default=datetime.timedelta(seconds=60 * 15), metavar='DURATION', help='Max acceptable age of the kernel arp table cache', ) cfg.add_argument( '--arper_min_entries_to_be_valid', type=int, default=site_config.get_config().arper_minimum_device_count, help='Min number of arp entries to bother persisting.', ) @persistent.persistent_autoloaded_singleton() class Arper(persistent.Persistent): def __init__(self, cached_state: Optional[BiDict] = None) -> None: self.state = BiDict() if cached_state is not None: logger.debug('Loading Arper map from cached state.') self.state = cached_state else: logger.debug('No usable cached state; calling /usr/sbin/arp') self.update_from_arp_scan() self.update_from_arp() if len(self.state) < config.config['arper_min_entries_to_be_valid']: raise Exception( f'Arper didn\'t find enough entries; only got {len(self.state)}.' ) def update_from_arp_scan(self): network_spec = site_config.get_config().network try: output = exec_utils.cmd( f'/usr/local/bin/arp-scan --retry=6 --timeout 350 --backoff=1.4 --random --numeric --plain --ignoredups {network_spec}', timeout_seconds=10.0, ) except Exception as e: logger.exception(e) return for line in output.split('\n'): ip = string_utils.extract_ip_v4(line) mac = string_utils.extract_mac_address(line) if ( ip is not None and mac is not None and mac != 'UNKNOWN' and ip != 'UNKNOWN' ): mac = mac.lower() logger.debug(f'ARPER: {mac} => {ip}') self.state[mac] = ip def update_from_arp(self): try: output = exec_utils.cmd('/usr/sbin/arp -a', timeout_seconds=10.0) except Exception as e: logger.exception(e) return for line in output.split('\n'): ip = string_utils.extract_ip_v4(line) mac = string_utils.extract_mac_address(line) if ( ip is not None and mac is not None and mac != 'UNKNOWN' and ip != 'UNKNOWN' ): mac = mac.lower() logger.debug(f'ARPER: {mac} => {ip}') self.state[mac] = ip def get_ip_by_mac(self, mac: str) -> Optional[str]: mac = mac.lower() return self.state.get(mac, None) def get_mac_by_ip(self, ip: str) -> Optional[str]: return self.state.inverse.get(ip, None) @classmethod @overrides def load(cls) -> Any: cache_file = config.config['arper_cache_location'] if persistent.was_file_written_within_n_seconds( cache_file, config.config['arper_cache_max_staleness'].total_seconds(), ): logger.debug(f'Loading state from {cache_file}') cached_state = BiDict() with open(cache_file, 'r') as rf: contents = rf.readlines() for line in contents: line = line[:-1] logger.debug(f'ARPER:{cache_file}> {line}') (mac, ip) = line.split(',') mac = mac.strip() mac = mac.lower() ip = ip.strip() cached_state[mac] = ip if len(cached_state) > config.config['arper_min_entries_to_be_valid']: return cls(cached_state) else: msg = f'{cache_file} is invalid: only {len(cached_state)} entries. Deleting it.' logger.warning(msg) warnings.warn(msg, stacklevel=2) os.remove(cache_file) logger.debug('No usable saved state found') return None @overrides def save(self) -> bool: if len(self.state) > config.config['arper_min_entries_to_be_valid']: logger.debug(f'Persisting state to {config.config["arper_cache_location"]}') with file_utils.FileWriter(config.config['arper_cache_location']) as wf: for (mac, ip) in self.state.items(): mac = mac.lower() print(f'{mac}, {ip}', file=wf) return True else: logger.warning( f'Only saw {len(self.state)} entries; needed at least {config.config["arper_min_entries_to_be_valid"]} to bother persisting.' ) return False