X-Git-Url: https://wannabe.guru.org/gitweb/?a=blobdiff_plain;f=arper.py;h=39aecf90bd438af92d69cffae6f4a3d44ea6aa0e;hb=36fea7f15ed17150691b5b3ead75450e575229ef;hp=2171e773088076aefc6f7d43c80fff59abb9b46f;hpb=b29be4f1750fd20bd2eada88e751dfae85817882;p=python_utils.git diff --git a/arper.py b/arper.py index 2171e77..39aecf9 100644 --- a/arper.py +++ b/arper.py @@ -6,6 +6,7 @@ import datetime import logging import os from typing import Any, Optional +import warnings from overrides import overrides @@ -35,21 +36,19 @@ cfg.add_argument( type=argparse_utils.valid_duration, default=datetime.timedelta(seconds=60 * 15), metavar='DURATION', - help='Max acceptable age of the kernel arp table cache' + help='Max acceptable age of the kernel arp table cache', ) cfg.add_argument( '--arper_min_entries_to_be_valid', type=int, default=site_config.get_config().arper_minimum_device_count, - help='Min number of arp entries to bother persisting.' + help='Min number of arp entries to bother persisting.', ) @persistent.persistent_autoloaded_singleton() class Arper(persistent.Persistent): - def __init__( - self, cached_state: Optional[BiDict[str, str]] = None - ) -> None: + def __init__(self, cached_state: Optional[BiDict] = None) -> None: self.state = BiDict() if cached_state is not None: logger.debug('Loading Arper map from cached state.') @@ -59,14 +58,16 @@ class Arper(persistent.Persistent): self.update_from_arp_scan() self.update_from_arp() if len(self.state) < config.config['arper_min_entries_to_be_valid']: - raise Exception('Arper didn\'t find enough entries; only got {len(self.state)}.') + raise Exception( + f'Arper didn\'t find enough entries; only got {len(self.state)}.' + ) def update_from_arp_scan(self): network_spec = site_config.get_config().network try: output = exec_utils.cmd( f'/usr/local/bin/arp-scan --retry=6 --timeout 350 --backoff=1.4 --random --numeric --plain --ignoredups {network_spec}', - timeout_seconds=10.0 + timeout_seconds=10.0, ) except Exception as e: logger.exception(e) @@ -74,24 +75,31 @@ class Arper(persistent.Persistent): for line in output.split('\n'): ip = string_utils.extract_ip_v4(line) mac = string_utils.extract_mac_address(line) - if ip is not None and mac is not None and mac != 'UNKNOWN' and ip != 'UNKNOWN': + if ( + ip is not None + and mac is not None + and mac != 'UNKNOWN' + and ip != 'UNKNOWN' + ): mac = mac.lower() logger.debug(f'ARPER: {mac} => {ip}') self.state[mac] = ip def update_from_arp(self): try: - output = exec_utils.cmd( - '/usr/sbin/arp -a', - timeout_seconds=10.0 - ) + output = exec_utils.cmd('/usr/sbin/arp -a', timeout_seconds=10.0) except Exception as e: logger.exception(e) return for line in output.split('\n'): ip = string_utils.extract_ip_v4(line) mac = string_utils.extract_mac_address(line) - if ip is not None and mac is not None and mac != 'UNKNOWN' and ip != 'UNKNOWN': + if ( + ip is not None + and mac is not None + and mac != 'UNKNOWN' + and ip != 'UNKNOWN' + ): mac = mac.lower() logger.debug(f'ARPER: {mac} => {ip}') self.state[mac] = ip @@ -108,8 +116,8 @@ class Arper(persistent.Persistent): def load(cls) -> Any: cache_file = config.config['arper_cache_location'] if persistent.was_file_written_within_n_seconds( - cache_file, - config.config['arper_cache_max_staleness'].total_seconds(), + cache_file, + config.config['arper_cache_max_staleness'].total_seconds(), ): logger.debug(f'Loading state from {cache_file}') cached_state = BiDict() @@ -123,14 +131,16 @@ class Arper(persistent.Persistent): mac = mac.lower() ip = ip.strip() cached_state[mac] = ip - if len(cached_state) > config.config['arper_min_entries_to_be_valid']: + if ( + len(cached_state) + > config.config['arper_min_entries_to_be_valid'] + ): return cls(cached_state) else: - logger.warning( - f'{cache_file} sucks, only {len(cached_state)} entries. Deleting it.' - ) + msg = f'{cache_file} is invalid: only {len(cached_state)} entries. Deleting it.' + logger.warning(msg) + warnings.warn(msg, stacklevel=2) os.remove(cache_file) - logger.debug('No usable saved state found') return None @@ -140,7 +150,9 @@ class Arper(persistent.Persistent): logger.debug( f'Persisting state to {config.config["arper_cache_location"]}' ) - with file_utils.FileWriter(config.config['arper_cache_location']) as wf: + with file_utils.FileWriter( + config.config['arper_cache_location'] + ) as wf: for (mac, ip) in self.state.items(): mac = mac.lower() print(f'{mac}, {ip}', file=wf)