#!/usr/bin/env python3 """A caching layer around the kernel's network mapping between IPs and MACs""" import datetime import logging import os import warnings from typing import Any, Optional from overrides import overrides import argparse_utils import config import exec_utils import file_utils import persistent import site_config import string_utils from collect.bidict import BiDict logger = logging.getLogger(__name__) cfg = config.add_commandline_args( f'MAC <--> IP Address mapping table cache ({__file__})', 'Commandline args related to MAC <--> IP Address mapping', ) cfg.add_argument( '--arper_cache_location', default=site_config.get_config().arper_cache_file, metavar='FILENAME', help='Where to cache the kernel ARP table', ) cfg.add_argument( '--arper_supplimental_cache_location', default=site_config.get_config(site_config.other_location()).arper_cache_file, metavar='FILENAME', help='Where someone else is caching the kernel ARP table', ) cfg.add_argument( '--arper_cache_max_staleness', type=argparse_utils.valid_duration, default=datetime.timedelta(seconds=60 * 30), metavar='DURATION', help='Max acceptable age of the kernel arp table cache', ) cfg.add_argument( '--arper_min_entries_to_be_valid', type=int, default=site_config.get_config().arper_minimum_device_count, help='Min number of arp entries to bother persisting.', ) @persistent.persistent_autoloaded_singleton() # type: ignore class Arper(persistent.Persistent): """A caching layer around the kernel's network mapping between IPs and MACs. This class restores persisted state that expires periodically (see --arper_cache_max_staleness) at program startup time. If it's unable to use the file's contents, it queries the kernel (via arp) and uses an auxillary utility called arp-scan to query the network. If it has to do this there's a latency hit but it persists the collected data in the cache file. Either way, the class behaves as a global singleton hosting this data thereafter. """ def __init__( self, cached_local_state: Optional[BiDict] = None, cached_supplimental_state: Optional[BiDict] = None, ) -> None: self.state = BiDict() if cached_local_state is not None: logger.debug('Loading Arper map from cached local state.') self.state = cached_local_state else: logger.debug('No usable cached state; calling /usr/sbin/arp') self.update_from_arp_scan() self.update_from_arp() if len(self.state) < config.config['arper_min_entries_to_be_valid']: raise Exception(f'Arper didn\'t find enough entries; only got {len(self.state)}.') if cached_supplimental_state is not None: logger.debug('Also added %d supplimental entries.', len(cached_supplimental_state)) for mac, ip in cached_supplimental_state.items(): self.state[mac] = ip for mac, ip in self.state.items(): logger.debug('%s <-> %s', mac, ip) def update_from_arp_scan(self): network_spec = site_config.get_config().network try: output = exec_utils.cmd( f'/usr/local/bin/arp-scan --retry=6 --timeout 350 --backoff=1.4 --random --numeric --plain --ignoredups {network_spec}', timeout_seconds=10.0, ) except Exception as e: logger.exception(e) return for line in output.split('\n'): ip = string_utils.extract_ip_v4(line) mac = string_utils.extract_mac_address(line) if ip is not None and mac is not None and mac != 'UNKNOWN' and ip != 'UNKNOWN': mac = mac.lower() logger.debug('ARPER: %s => %s', mac, ip) self.state[mac] = ip def update_from_arp(self): try: output = exec_utils.cmd('/usr/sbin/arp -a', timeout_seconds=10.0) except Exception as e: logger.exception(e) return for line in output.split('\n'): ip = string_utils.extract_ip_v4(line) mac = string_utils.extract_mac_address(line) if ip is not None and mac is not None and mac != 'UNKNOWN' and ip != 'UNKNOWN': mac = mac.lower() logger.debug('ARPER: %s => %s', mac, ip) self.state[mac] = ip def get_ip_by_mac(self, mac: str) -> Optional[str]: mac = mac.lower() return self.state.get(mac, None) def get_mac_by_ip(self, ip: str) -> Optional[str]: return self.state.inverse.get(ip, None) @classmethod def load_state( cls, cache_file: str, freshness_threshold_sec: int, state: BiDict, ): if not file_utils.file_is_readable(cache_file): logger.debug('Can\'t read %s', cache_file) return if persistent.was_file_written_within_n_seconds( cache_file, freshness_threshold_sec, ): logger.debug('Loading state from %s', cache_file) count = 0 with open(cache_file, 'r') as rf: contents = rf.readlines() for line in contents: line = line[:-1] logger.debug('ARPER:%s> %s', cache_file, line) (mac, ip) = line.split(',') mac = mac.strip() mac = mac.lower() ip = ip.strip() state[mac] = ip count += 1 else: logger.debug('%s is too stale.', cache_file) @classmethod @overrides def load(cls) -> Any: local_state: BiDict = BiDict() cache_file = config.config['arper_cache_location'] max_staleness = config.config['arper_cache_max_staleness'].total_seconds() logger.debug('Trying to load main arper cache from %s...', cache_file) cls.load_state(cache_file, max_staleness, local_state) if len(local_state) <= config.config['arper_min_entries_to_be_valid']: msg = f'{cache_file} is invalid: only {len(local_state)} entries. Deleting it.' logger.warning(msg) warnings.warn(msg, stacklevel=2) try: os.remove(cache_file) except Exception: pass supplimental_state: BiDict = BiDict() cache_file = config.config['arper_supplimental_cache_location'] max_staleness = config.config['arper_cache_max_staleness'].total_seconds() logger.debug('Trying to suppliment arper state from %s', cache_file) cls.load_state(cache_file, max_staleness, supplimental_state) if len(local_state) > 0: return cls(local_state, supplimental_state) return None @overrides def save(self) -> bool: if len(self.state) > config.config['arper_min_entries_to_be_valid']: logger.debug('Persisting state to %s', config.config["arper_cache_location"]) with file_utils.FileWriter(config.config['arper_cache_location']) as wf: for (mac, ip) in self.state.items(): mac = mac.lower() print(f'{mac}, {ip}', file=wf) return True else: logger.warning( 'Only saw %d entries; needed at least %d to bother persisting.', len(self.state), config.config["arper_min_entries_to_be_valid"], ) return False