#!/usr/bin/env python3 # © Copyright 2021-2022, Scott Gasch """A caching layer around the kernel's network mapping between IPs and MACs""" import datetime import logging import os import warnings from typing import Any, Optional from overrides import overrides import argparse_utils import config import exec_utils import file_utils import persistent import site_config import string_utils from collect.bidict import BiDict logger = logging.getLogger(__name__) cfg = config.add_commandline_args( f'MAC <--> IP Address mapping table cache ({__file__})', 'Commandline args related to MAC <--> IP Address mapping', ) cfg.add_argument( '--arper_cache_location', default=site_config.get_config().arper_cache_file, metavar='FILENAME', help='Where to cache the kernel ARP table', ) cfg.add_argument( '--arper_supplimental_cache_location', default=site_config.get_config(site_config.other_location()).arper_cache_file, metavar='FILENAME', help='Where someone else is caching the kernel ARP table', ) cfg.add_argument( '--arper_cache_max_staleness', type=argparse_utils.valid_duration, default=datetime.timedelta(seconds=60 * 30), metavar='DURATION', help='Max acceptable age of the kernel arp table cache', ) cfg.add_argument( '--arper_min_entries_to_be_valid', type=int, default=site_config.get_config().arper_minimum_device_count, help='Min number of arp entries to bother persisting.', ) @persistent.persistent_autoloaded_singleton() # type: ignore class Arper(persistent.Persistent): """A caching layer around the kernel's network mapping between IPs and MACs. This class restores persisted state that expires periodically (see --arper_cache_max_staleness) at program startup time. If it's unable to use the file's contents, it queries the kernel (via arp) and uses an auxillary utility called arp-scan to query the network. If it has to do this there's a latency hit but it persists the collected data in the cache file. Either way, the class behaves as a global singleton hosting this data thereafter. """ def __init__( self, cached_local_state: Optional[BiDict] = None, cached_supplimental_state: Optional[BiDict] = None, ) -> None: """For most purposes, ignore the arguments. Because this is a Persistent subclass the decorator will handle invoking our load and save methods to read/write persistent state transparently. Args: cached_local_state: local state to initialize mapping cached_supplimental_state: remote state to initialize mapping """ self.state = BiDict() if cached_local_state is not None: logger.debug('Loading Arper map from cached local state.') self.state = cached_local_state else: logger.debug('No usable cached state; calling /usr/sbin/arp') self._update_from_arp_scan() self._update_from_arp() if len(self.state) < config.config['arper_min_entries_to_be_valid']: raise Exception(f'Arper didn\'t find enough entries; only got {len(self.state)}.') if cached_supplimental_state is not None: logger.debug('Also added %d supplimental entries.', len(cached_supplimental_state)) for mac, ip in cached_supplimental_state.items(): self.state[mac] = ip for mac, ip in self.state.items(): logger.debug('%s <-> %s', mac, ip) def _update_from_arp_scan(self): """Internal method to initialize our state via a call to arp-scan.""" network_spec = site_config.get_config().network try: output = exec_utils.cmd( f'/usr/local/bin/arp-scan --retry=6 --timeout 350 --backoff=1.4 --random --numeric --plain --ignoredups {network_spec}', timeout_seconds=10.0, ) except Exception as e: logger.exception(e) return for line in output.split('\n'): ip = string_utils.extract_ip_v4(line) mac = string_utils.extract_mac_address(line) if ip is not None and mac is not None and mac != 'UNKNOWN' and ip != 'UNKNOWN': mac = mac.lower() logger.debug('ARPER: %s => %s', mac, ip) self.state[mac] = ip def _update_from_arp(self): """Internal method to initialize our state via a call to arp.""" try: output = exec_utils.cmd('/usr/sbin/arp -a', timeout_seconds=10.0) except Exception as e: logger.exception(e) return for line in output.split('\n'): ip = string_utils.extract_ip_v4(line) mac = string_utils.extract_mac_address(line) if ip is not None and mac is not None and mac != 'UNKNOWN' and ip != 'UNKNOWN': mac = mac.lower() logger.debug('ARPER: %s => %s', mac, ip) self.state[mac] = ip def get_ip_by_mac(self, mac: str) -> Optional[str]: """Given a MAC address, see if we know it's IP address and, if so, return it. If not, return None. Args: mac: the MAC address to lookup. Should be formatted like ab:cd:ef:g1:23:45. Returns: The IPv4 address associated with that MAC address (as a string) or None if it's not known. """ m = string_utils.extract_mac_address(mac) if not m: return None m = m.lower() if not string_utils.is_mac_address(m): return None return self.state.get(m, None) def get_mac_by_ip(self, ip: str) -> Optional[str]: """Given an IPv4 address (as a string), check to see if we know what MAC address is associated with it and, if so, return it. If not, return None. Args: ip: the IPv4 address to look up. Returns: The associated MAC address, if known. Or None if not. """ return self.state.inverse.get(ip, None) @classmethod def _load_state( cls, cache_file: str, freshness_threshold_sec: int, state: BiDict, ): """Internal helper method behind load.""" if not file_utils.file_is_readable(cache_file): logger.debug('Can\'t read %s', cache_file) return if persistent.was_file_written_within_n_seconds( cache_file, freshness_threshold_sec, ): logger.debug('Loading state from %s', cache_file) count = 0 with open(cache_file, 'r') as rf: contents = rf.readlines() for line in contents: line = line[:-1] logger.debug('ARPER:%s> %s', cache_file, line) (mac, ip) = line.split(',') mac = mac.strip() mac = mac.lower() ip = ip.strip() state[mac] = ip count += 1 else: logger.debug('%s is too stale.', cache_file) @classmethod @overrides def load(cls) -> Any: """Internal helper method to fulfull Persistent requirements.""" local_state: BiDict = BiDict() cache_file = config.config['arper_cache_location'] max_staleness = config.config['arper_cache_max_staleness'].total_seconds() logger.debug('Trying to load main arper cache from %s...', cache_file) cls._load_state(cache_file, max_staleness, local_state) if len(local_state) <= config.config['arper_min_entries_to_be_valid']: msg = f'{cache_file} is invalid: only {len(local_state)} entries. Deleting it.' logger.warning(msg) warnings.warn(msg, stacklevel=2) try: os.remove(cache_file) except Exception: pass supplimental_state: BiDict = BiDict() cache_file = config.config['arper_supplimental_cache_location'] max_staleness = config.config['arper_cache_max_staleness'].total_seconds() logger.debug('Trying to suppliment arper state from %s', cache_file) cls._load_state(cache_file, max_staleness, supplimental_state) if len(local_state) > 0: return cls(local_state, supplimental_state) return None @overrides def save(self) -> bool: """Internal helper method to fulfull Persistent requirements.""" if len(self.state) > config.config['arper_min_entries_to_be_valid']: logger.debug('Persisting state to %s', config.config["arper_cache_location"]) with file_utils.FileWriter(config.config['arper_cache_location']) as wf: for (mac, ip) in self.state.items(): mac = mac.lower() print(f'{mac}, {ip}', file=wf) return True else: logger.warning( 'Only saw %d entries; needed at least %d to bother persisting.', len(self.state), config.config["arper_min_entries_to_be_valid"], ) return False