#!/usr/bin/env python3 import datetime import logging import os from typing import Any, Optional import argparse_utils from collect.bidict import BiDict import config import exec_utils import persistent import string_utils logger = logging.getLogger(__name__) cfg = config.add_commandline_args( f'MAC <--> IP Address mapping table cache ({__file__})', 'Commandline args related to MAC <--> IP Address mapping', ) cfg.add_argument( '--arper_cache_location', default=f'{os.environ["HOME"]}/cache/.arp_table_cache', metavar='FILENAME', help='Where to cache the kernel ARP table', ) cfg.add_argument( '--arp_cache_max_staleness', type=argparse_utils.valid_duration, default=datetime.timedelta(seconds=60 * 5), metavar='DURATION', help='Max acceptable age of the kernel arp table cache' ) @persistent.persistent_autoloaded_singleton() class Arper(persistent.Persistent): def __init__(self, cached_state: Optional[BiDict[str, str]] = None) -> None: self.state = BiDict() if cached_state is not None: logger.debug('Loading Arper map from cached state.') self.state = cached_state else: logger.debug('No usable cached state; calling /usr/sbin/arp') output = exec_utils.cmd( '/usr/sbin/arp -a', timeout_seconds=5.0 ) for line in output.split('\n'): line = str(line, 'ascii') ip = string_utils.extract_ip_v4(line) mac = string_utils.extract_mac_address(line) mac = mac.lower() logger.debug(f' {mac} => {ip}') self.state[mac] = ip def get_ip_by_mac(self, mac: str) -> Optional[str]: mac = mac.lower() return self.state.get(mac, None) def get_mac_by_ip(self, ip: str) -> Optional[str]: return self.state.inverse.get(ip, None) def save(self) -> bool: logger.debug(f'Persisting state to {config.config["arp_cache_location"]}') with open(config.config['arp_cache_location'], 'w') as wf: for (mac, ip) in self.state.items(): mac = mac.lower() print(f'{mac}, {ip}', file=wf) @classmethod def load(cls) -> Any: if persistent.was_file_written_within_n_seconds( config.config['arp_cache_location'], config.config['arp_cache_max_staleness'].total_seconds(), ): logger.debug(f'Loading state from {config.config["arp_cache_location"]}') cached_state = BiDict() with open(config.config['arp_cache_location'], 'r') as rf: for line in rd.readline(): (mac, ip) = line.split(',') mac = mac.strip() mac = mac.lower() ip = ip.strip() cached_state[mac] = ip return Arper(cached_state) logger.debug('No usable saved state found') return None