From 17e8082381dbbf691dfb19fb1b38a97e48d6ab87 Mon Sep 17 00:00:00 2001 From: Scott Date: Wed, 26 Jan 2022 21:19:41 -0800 Subject: [PATCH] Reformatting. --- arper.py | 41 ++++---- base_presence.py | 50 ++++++---- camera_utils.py | 20 ++-- logging_utils.py | 252 ++++++++++++++++++++++++++--------------------- 4 files changed, 204 insertions(+), 159 deletions(-) diff --git a/arper.py b/arper.py index 696bf97..29a8a12 100644 --- a/arper.py +++ b/arper.py @@ -36,21 +36,19 @@ cfg.add_argument( type=argparse_utils.valid_duration, default=datetime.timedelta(seconds=60 * 15), metavar='DURATION', - help='Max acceptable age of the kernel arp table cache' + help='Max acceptable age of the kernel arp table cache', ) cfg.add_argument( '--arper_min_entries_to_be_valid', type=int, default=site_config.get_config().arper_minimum_device_count, - help='Min number of arp entries to bother persisting.' + help='Min number of arp entries to bother persisting.', ) @persistent.persistent_autoloaded_singleton() class Arper(persistent.Persistent): - def __init__( - self, cached_state: Optional[BiDict] = None - ) -> None: + def __init__(self, cached_state: Optional[BiDict] = None) -> None: self.state = BiDict() if cached_state is not None: logger.debug('Loading Arper map from cached state.') @@ -60,14 +58,16 @@ class Arper(persistent.Persistent): self.update_from_arp_scan() self.update_from_arp() if len(self.state) < config.config['arper_min_entries_to_be_valid']: - raise Exception(f'Arper didn\'t find enough entries; only got {len(self.state)}.') + raise Exception( + f'Arper didn\'t find enough entries; only got {len(self.state)}.' + ) def update_from_arp_scan(self): network_spec = site_config.get_config().network try: output = exec_utils.cmd( f'/usr/local/bin/arp-scan --retry=6 --timeout 350 --backoff=1.4 --random --numeric --plain --ignoredups {network_spec}', - timeout_seconds=10.0 + timeout_seconds=10.0, ) except Exception as e: logger.exception(e) @@ -75,24 +75,31 @@ class Arper(persistent.Persistent): for line in output.split('\n'): ip = string_utils.extract_ip_v4(line) mac = string_utils.extract_mac_address(line) - if ip is not None and mac is not None and mac != 'UNKNOWN' and ip != 'UNKNOWN': + if ( + ip is not None + and mac is not None + and mac != 'UNKNOWN' + and ip != 'UNKNOWN' + ): mac = mac.lower() logger.debug(f'ARPER: {mac} => {ip}') self.state[mac] = ip def update_from_arp(self): try: - output = exec_utils.cmd( - '/usr/sbin/arp -a', - timeout_seconds=10.0 - ) + output = exec_utils.cmd('/usr/sbin/arp -a', timeout_seconds=10.0) except Exception as e: logger.exception(e) return for line in output.split('\n'): ip = string_utils.extract_ip_v4(line) mac = string_utils.extract_mac_address(line) - if ip is not None and mac is not None and mac != 'UNKNOWN' and ip != 'UNKNOWN': + if ( + ip is not None + and mac is not None + and mac != 'UNKNOWN' + and ip != 'UNKNOWN' + ): mac = mac.lower() logger.debug(f'ARPER: {mac} => {ip}') self.state[mac] = ip @@ -109,8 +116,8 @@ class Arper(persistent.Persistent): def load(cls) -> Any: cache_file = config.config['arper_cache_location'] if persistent.was_file_written_within_n_seconds( - cache_file, - config.config['arper_cache_max_staleness'].total_seconds(), + cache_file, + config.config['arper_cache_max_staleness'].total_seconds(), ): logger.debug(f'Loading state from {cache_file}') cached_state = BiDict() @@ -137,9 +144,7 @@ class Arper(persistent.Persistent): @overrides def save(self) -> bool: if len(self.state) > config.config['arper_min_entries_to_be_valid']: - logger.debug( - f'Persisting state to {config.config["arper_cache_location"]}' - ) + logger.debug(f'Persisting state to {config.config["arper_cache_location"]}') with file_utils.FileWriter(config.config['arper_cache_location']) as wf: for (mac, ip) in self.state.items(): mac = mac.lower() diff --git a/base_presence.py b/base_presence.py index 94c5e2f..f846e65 100755 --- a/base_presence.py +++ b/base_presence.py @@ -25,16 +25,16 @@ cfg = config.add_commandline_args( cfg.add_argument( "--presence_macs_file", type=argparse_utils.valid_filename, - default = "/home/scott/cron/persisted_mac_addresses.txt", + default="/home/scott/cron/persisted_mac_addresses.txt", metavar="FILENAME", - help="The location of persisted_mac_addresses.txt to use." + help="The location of persisted_mac_addresses.txt to use.", ) cfg.add_argument( '--presence_tolerable_staleness_seconds', type=argparse_utils.valid_duration, default=datetime.timedelta(seconds=60 * 5), metavar='DURATION', - help='Max acceptable age of location data before auto-refreshing' + help='Max acceptable age of location data before auto-refreshing', ) @@ -43,16 +43,16 @@ class PresenceDetection(object): # Note: list most important devices first. self.devices_by_person: Dict[Person, List[str]] = { Person.SCOTT: [ - "DC:E5:5B:0F:03:3D", # pixel6 - "6C:40:08:AE:DC:2E", # laptop + "DC:E5:5B:0F:03:3D", # pixel6 + "6C:40:08:AE:DC:2E", # laptop ], Person.LYNN: [ - "08:CC:27:63:26:14", # motog7 - "B8:31:B5:9A:4F:19", # laptop + "08:CC:27:63:26:14", # motog7 + "B8:31:B5:9A:4F:19", # laptop ], Person.ALEX: [ - "0C:CB:85:0C:8B:AE", # phone - "D0:C6:37:E3:36:9A", # laptop + "0C:CB:85:0C:8B:AE", # phone + "D0:C6:37:E3:36:9A", # laptop ], Person.AARON_AND_DANA: [ "98:B6:E9:E5:5A:7C", @@ -83,7 +83,10 @@ class PresenceDetection(object): else: now = datetime.datetime.now() delta = now - self.last_update - if delta.total_seconds() > config.config['presence_tolerable_staleness_seconds'].total_seconds(): + if ( + delta.total_seconds() + > config.config['presence_tolerable_staleness_seconds'].total_seconds() + ): logger.debug( f"It's been {delta.total_seconds()}s since last update; refreshing now." ) @@ -101,6 +104,7 @@ class PresenceDetection(object): def update_from_house(self) -> None: from exec_utils import cmd + try: persisted_macs = config.config['presence_macs_file'] except KeyError: @@ -121,6 +125,7 @@ class PresenceDetection(object): def update_from_cabin(self) -> None: from exec_utils import cmd + try: persisted_macs = config.config['presence_macs_file'] except KeyError: @@ -139,9 +144,7 @@ class PresenceDetection(object): warnings.warn(msg, stacklevel=2) self.dark_locations.add(Location.HOUSE) - def read_persisted_macs_file( - self, filename: str, location: Location - ) -> None: + def read_persisted_macs_file(self, filename: str, location: Location) -> None: if location is Location.UNKNOWN: return with open(filename, "r") as rf: @@ -170,9 +173,9 @@ class PresenceDetection(object): logger.exception(e) continue mac = mac.strip() - (self.location_ts_by_mac[location])[ - mac - ] = datetime.datetime.fromtimestamp(int(ts.strip())) + (self.location_ts_by_mac[location])[mac] = datetime.datetime.fromtimestamp( + int(ts.strip()) + ) ip_name = ip_name.strip() match = re.match(r"(\d+\.\d+\.\d+\.\d+) +\(([^\)]+)\)", ip_name) if match is not None: @@ -198,7 +201,9 @@ class PresenceDetection(object): def where_is_person_now(self, name: Person) -> Location: self.maybe_update() if len(self.dark_locations) > 0: - msg = f"Can't see {self.dark_locations} right now; answer confidence impacted" + msg = ( + f"Can't see {self.dark_locations} right now; answer confidence impacted" + ) logger.warning(msg) warnings.warn(msg, stacklevel=2) logger.debug(f'Looking for {name}...') @@ -210,6 +215,7 @@ class PresenceDetection(object): return Location.UNKNOWN import dict_utils + votes: Dict[Location, int] = {} tiebreaks: Dict[Location, datetime.datetime] = {} credit = 10000 @@ -224,14 +230,14 @@ class PresenceDetection(object): logger.debug(f'Seen {mac} ({mac_name}) at {location} since {ts}') tiebreaks[location] = ts - (most_recent_location, first_seen_ts) = dict_utils.item_with_max_value(tiebreaks) + (most_recent_location, first_seen_ts) = dict_utils.item_with_max_value( + tiebreaks + ) bonus = credit v = votes.get(most_recent_location, 0) votes[most_recent_location] = v + bonus logger.debug(f'{name}: {location} gets {bonus} votes.') - credit = int( - credit * 0.2 - ) # Note: list most important devices first + credit = int(credit * 0.2) # Note: list most important devices first if credit <= 0: credit = 1 if len(votes) > 0: @@ -247,6 +253,8 @@ def main() -> None: for person in Person: print(f'{person} => {p.where_is_person_now(person)}') print() + + # for location in Location: # print(f'{location} => {p.is_anyone_in_location_now(location)}') diff --git a/camera_utils.py b/camera_utils.py index 204a337..799efd3 100644 --- a/camera_utils.py +++ b/camera_utils.py @@ -20,6 +20,7 @@ logger = logging.getLogger(__name__) class RawJpgHsv(NamedTuple): """Raw image bytes, the jpeg image and the HSV (hue saturation value) image.""" + raw: Optional[bytes] jpg: Optional[np.ndarray] hsv: Optional[np.ndarray] @@ -27,12 +28,14 @@ class RawJpgHsv(NamedTuple): class SanityCheckImageMetadata(NamedTuple): """Is a Blue Iris image bad (big grey borders around it) or infrared?""" + is_bad_image: bool is_infrared_image: bool def sanity_check_image(hsv: np.ndarray) -> SanityCheckImageMetadata: """See if a Blue Iris or Shinobi image is bad and infrared.""" + def is_near(a, b) -> bool: return abs(a - b) < 3 @@ -44,12 +47,12 @@ def sanity_check_image(hsv: np.ndarray) -> SanityCheckImageMetadata: for c in range(cols): pixel = hsv[(r, c)] if ( - is_near(pixel[0], 16) and - is_near(pixel[1], 117) and - is_near(pixel[2], 196) + is_near(pixel[0], 16) + and is_near(pixel[1], 117) + and is_near(pixel[2], 196) ): weird_orange_count += 1 - elif (is_near(pixel[0], 0) and is_near(pixel[1], 0)): + elif is_near(pixel[0], 0) and is_near(pixel[1], 0): hs_zero_count += 1 logger.debug(f"hszero#={hs_zero_count}, weird_orange={weird_orange_count}") return SanityCheckImageMetadata( @@ -160,17 +163,13 @@ def fetch_camera_image_from_rtsp_stream( def _fetch_camera_image( camera_name: str, *, width: int = 256, quality: int = 70 ) -> RawJpgHsv: - """Fetch a webcam image given the camera name. - - """ + """Fetch a webcam image given the camera name.""" logger.debug("Trying to fetch camera image from video server") raw = fetch_camera_image_from_video_server( camera_name, width=width, quality=quality ) if raw is None: - logger.debug( - "Reading from video server failed; trying direct RTSP stream" - ) + logger.debug("Reading from video server failed; trying direct RTSP stream") raw = fetch_camera_image_from_rtsp_stream(camera_name, width=width) if raw is not None and len(raw) > 0: tmp = np.frombuffer(raw, dtype="uint8") @@ -199,4 +198,5 @@ def fetch_camera_image( if __name__ == '__main__': import doctest + doctest.testmod() diff --git a/logging_utils.py b/logging_utils.py index 2b39767..a15ccd6 100644 --- a/logging_utils.py +++ b/logging_utils.py @@ -22,9 +22,7 @@ import pytz import argparse_utils import config -cfg = config.add_commandline_args( - f'Logging ({__file__})', - 'Args related to logging') +cfg = config.add_commandline_args(f'Logging ({__file__})', 'Args related to logging') cfg.add_argument( '--logging_config_file', type=argparse_utils.valid_filename, @@ -44,14 +42,14 @@ cfg.add_argument( '--logging_format', type=str, default=None, - help='The format for lines logged via the logger module. See: https://docs.python.org/3/library/logging.html#formatter-objects' + help='The format for lines logged via the logger module. See: https://docs.python.org/3/library/logging.html#formatter-objects', ) cfg.add_argument( '--logging_date_format', type=str, default='%Y/%m/%dT%H:%M:%S.%f%z', metavar='DATEFMT', - help='The format of any dates in --logging_format.' + help='The format of any dates in --logging_format.', ) cfg.add_argument( '--logging_console', @@ -64,35 +62,55 @@ cfg.add_argument( type=str, default=None, metavar='FILENAME', - help='The filename of the logfile to write.' + help='The filename of the logfile to write.', ) cfg.add_argument( '--logging_filename_maxsize', type=int, - default=(1024*1024), + default=(1024 * 1024), metavar='#BYTES', - help='The maximum size (in bytes) to write to the logging_filename.' + help='The maximum size (in bytes) to write to the logging_filename.', ) cfg.add_argument( '--logging_filename_count', type=int, default=7, metavar='COUNT', - help='The number of logging_filename copies to keep before deleting.' + help='The number of logging_filename copies to keep before deleting.', ) cfg.add_argument( '--logging_syslog', action=argparse_utils.ActionNoYes, default=False, - help='Should we log to localhost\'s syslog.' + help='Should we log to localhost\'s syslog.', ) cfg.add_argument( '--logging_syslog_facility', type=str, - default = 'USER', - choices=['NOTSET', 'AUTH', 'AUTH_PRIV', 'CRON', 'DAEMON', 'FTP', 'KERN', 'LPR', 'MAIL', 'NEWS', - 'SYSLOG', 'USER', 'UUCP', 'LOCAL0', 'LOCAL1', 'LOCAL2', 'LOCAL3', 'LOCAL4', 'LOCAL5', - 'LOCAL6', 'LOCAL7'], + default='USER', + choices=[ + 'NOTSET', + 'AUTH', + 'AUTH_PRIV', + 'CRON', + 'DAEMON', + 'FTP', + 'KERN', + 'LPR', + 'MAIL', + 'NEWS', + 'SYSLOG', + 'USER', + 'UUCP', + 'LOCAL0', + 'LOCAL1', + 'LOCAL2', + 'LOCAL3', + 'LOCAL4', + 'LOCAL5', + 'LOCAL6', + 'LOCAL7', + ], metavar='SYSLOG_FACILITY_LIST', help='The default syslog message facility identifier', ) @@ -100,59 +118,59 @@ cfg.add_argument( '--logging_debug_threads', action=argparse_utils.ActionNoYes, default=False, - help='Should we prepend pid/tid data to all log messages?' + help='Should we prepend pid/tid data to all log messages?', ) cfg.add_argument( '--logging_debug_modules', action=argparse_utils.ActionNoYes, default=False, - help='Should we prepend module/function data to all log messages?' + help='Should we prepend module/function data to all log messages?', ) cfg.add_argument( '--logging_info_is_print', action=argparse_utils.ActionNoYes, default=False, - help='logging.info also prints to stdout.' + help='logging.info also prints to stdout.', ) cfg.add_argument( '--logging_squelch_repeats', action=argparse_utils.ActionNoYes, default=True, - help='Do we allow code to indicate that it wants to squelch repeated logging messages or should we always log?' + help='Do we allow code to indicate that it wants to squelch repeated logging messages or should we always log?', ) cfg.add_argument( '--logging_probabilistically', action=argparse_utils.ActionNoYes, default=True, - help='Do we allow probabilistic logging (for code that wants it) or should we always log?' + help='Do we allow probabilistic logging (for code that wants it) or should we always log?', ) # See also: OutputMultiplexer cfg.add_argument( '--logging_captures_prints', action=argparse_utils.ActionNoYes, default=False, - help='When calling print, also log.info automatically.' + help='When calling print, also log.info automatically.', ) cfg.add_argument( '--lmodule', type=str, metavar='=[,=...]', help=( - 'Allows per-scope logging levels which override the global level set with --logging-level.' + - 'Pass a space separated list of = where is one of: module, ' + - 'module:function, or :function and is a logging level (e.g. INFO, DEBUG...)' - ) + 'Allows per-scope logging levels which override the global level set with --logging-level.' + + 'Pass a space separated list of = where is one of: module, ' + + 'module:function, or :function and is a logging level (e.g. INFO, DEBUG...)' + ), ) cfg.add_argument( '--logging_clear_preexisting_handlers', action=argparse_utils.ActionNoYes, default=True, help=( - 'Should logging code clear preexisting global logging handlers and thus insist that is ' + - 'alone can add handlers. Use this to work around annoying modules that insert global ' + - 'handlers with formats and logging levels you might now want. Caveat emptor, this may ' + - 'cause you to miss logging messages.' - ) + 'Should logging code clear preexisting global logging handlers and thus insist that is ' + + 'alone can add handlers. Use this to work around annoying modules that insert global ' + + 'handlers with formats and logging levels you might now want. Caveat emptor, this may ' + + 'cause you to miss logging messages.' + ), ) built_in_print = print @@ -176,11 +194,14 @@ def squelch_repeated_log_messages(squelch_after_n_repeats: int) -> Callable: string), the messages are considered to be different. """ + def squelch_logging_wrapper(f: Callable): import function_utils + identifier = function_utils.function_identifier(f) squelched_logging_counts[identifier] = squelch_after_n_repeats return f + return squelch_logging_wrapper @@ -199,6 +220,7 @@ class SquelchRepeatedMessagesFilter(logging.Filter): the --no_logging_squelch_repeats commandline flag. """ + def __init__(self) -> None: self.counters = collections.Counter() super().__init__() @@ -220,24 +242,23 @@ class DynamicPerScopeLoggingLevelFilter(logging.Filter): module names or module:function names. Blocks others. """ + @staticmethod def level_name_to_level(name: str) -> int: - numeric_level = getattr( - logging, - name, - None - ) + numeric_level = getattr(logging, name, None) if not isinstance(numeric_level, int): raise ValueError(f'Invalid level: {name}') return numeric_level def __init__( - self, - default_logging_level: int, - per_scope_logging_levels: str, + self, + default_logging_level: int, + per_scope_logging_levels: str, ) -> None: super().__init__() - self.valid_levels = set(['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']) + self.valid_levels = set( + ['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'] + ) self.default_logging_level = default_logging_level self.level_by_scope = {} if per_scope_logging_levels is not None: @@ -245,7 +266,7 @@ class DynamicPerScopeLoggingLevelFilter(logging.Filter): if '=' not in chunk: print( f'Malformed lmodule directive: "{chunk}", missing "=". Ignored.', - file=sys.stderr + file=sys.stderr, ) continue try: @@ -253,7 +274,7 @@ class DynamicPerScopeLoggingLevelFilter(logging.Filter): except ValueError: print( f'Malformed lmodule directive: "{chunk}". Ignored.', - file=sys.stderr + file=sys.stderr, ) continue scope = scope.strip() @@ -261,14 +282,12 @@ class DynamicPerScopeLoggingLevelFilter(logging.Filter): if level not in self.valid_levels: print( f'Malformed lmodule directive: "{chunk}", bad level. Ignored.', - file=sys.stderr + file=sys.stderr, ) continue - self.level_by_scope[scope] = ( - DynamicPerScopeLoggingLevelFilter.level_name_to_level( - level - ) - ) + self.level_by_scope[ + scope + ] = DynamicPerScopeLoggingLevelFilter.level_name_to_level(level) @overrides def filter(self, record: logging.LogRecord) -> bool: @@ -276,9 +295,9 @@ class DynamicPerScopeLoggingLevelFilter(logging.Filter): if len(self.level_by_scope) > 0: min_level = None for scope in ( - record.module, - f'{record.module}:{record.funcName}', - f':{record.funcName}' + record.module, + f'{record.module}:{record.funcName}', + f':{record.funcName}', ): level = self.level_by_scope.get(scope, None) if level is not None: @@ -311,11 +330,14 @@ def logging_is_probabilistic(probability_of_logging: float) -> Callable: This affects *ALL* logging statements within the marked function. """ + def probabilistic_logging_wrapper(f: Callable): import function_utils + identifier = function_utils.function_identifier(f) probabilistic_logging_levels[identifier] = probability_of_logging return f + return probabilistic_logging_wrapper @@ -328,6 +350,7 @@ class ProbabilisticFilter(logging.Filter): been tagged with the @logging_utils.probabilistic_logging decorator. """ + @overrides def filter(self, record: logging.LogRecord) -> bool: id1 = f'{record.module}:{record.funcName}' @@ -345,6 +368,7 @@ class OnlyInfoFilter(logging.Filter): stdout handler. """ + @overrides def filter(self, record: logging.LogRecord): return record.levelno == logging.INFO @@ -356,6 +380,7 @@ class MillisecondAwareFormatter(logging.Formatter): whatever reason, the default python logger doesn't do. """ + converter = datetime.datetime.fromtimestamp @overrides @@ -395,9 +420,7 @@ def initialize_logging(logger=None) -> logging.Logger: # Global default logging level (--logging_level) default_logging_level = getattr( - logging, - config.config['logging_level'].upper(), - None + logging, config.config['logging_level'].upper(), None ) if not isinstance(default_logging_level, int): raise ValueError('Invalid level: %s' % config.config['logging_level']) @@ -431,8 +454,8 @@ def initialize_logging(logger=None) -> logging.Logger: if config.config['logging_filename']: handler = RotatingFileHandler( config.config['logging_filename'], - maxBytes = config.config['logging_filename_maxsize'], - backupCount = config.config['logging_filename_count'], + maxBytes=config.config['logging_filename_maxsize'], + backupCount=config.config['logging_filename_count'], ) handler.setFormatter( MillisecondAwareFormatter( @@ -483,6 +506,7 @@ def initialize_logging(logger=None) -> logging.Logger: if config.config['logging_captures_prints']: import builtins + global built_in_print def print_and_also_log(*arg, **kwarg): @@ -492,29 +516,43 @@ def initialize_logging(logger=None) -> logging.Logger: else: logger.info(*arg) built_in_print(*arg, **kwarg) + builtins.print = print_and_also_log # At this point the logger is ready, handlers are set up, # etc... so log about the logging configuration. - level_name = logging._levelToName.get(default_logging_level, str(default_logging_level)) - logger.debug( - f'Initialized global logging; default logging level is {level_name}.' + level_name = logging._levelToName.get( + default_logging_level, str(default_logging_level) ) - if config.config['logging_clear_preexisting_handlers'] and preexisting_handlers_count > 0: + logger.debug(f'Initialized global logging; default logging level is {level_name}.') + if ( + config.config['logging_clear_preexisting_handlers'] + and preexisting_handlers_count > 0 + ): msg = f'Logging cleared {preexisting_handlers_count} global handlers (--logging_clear_preexisting_handlers)' logger.warning(msg) logger.debug(f'Logging format specification is "{fmt}"') if config.config['logging_debug_threads']: - logger.debug('...Logging format spec captures tid/pid (--logging_debug_threads)') + logger.debug( + '...Logging format spec captures tid/pid (--logging_debug_threads)' + ) if config.config['logging_debug_modules']: - logger.debug('...Logging format spec captures files/functions/lineno (--logging_debug_modules)') + logger.debug( + '...Logging format spec captures files/functions/lineno (--logging_debug_modules)' + ) if config.config['logging_syslog']: - logger.debug(f'Logging to syslog as {facility_name} with priority mapping based on level') + logger.debug( + f'Logging to syslog as {facility_name} with priority mapping based on level' + ) if config.config['logging_filename']: logger.debug(f'Logging to filename {config.config["logging_filename"]}') - logger.debug(f'...with {config.config["logging_filename_maxsize"]} bytes max file size.') - logger.debug(f'...and {config.config["logging_filename_count"]} rotating backup file count.') + logger.debug( + f'...with {config.config["logging_filename_maxsize"]} bytes max file size.' + ) + logger.debug( + f'...and {config.config["logging_filename_count"]} rotating backup file count.' + ) if config.config['logging_console']: logger.debug('Logging to the console (stderr).') if config.config['logging_info_is_print']: @@ -542,7 +580,9 @@ def initialize_logging(logger=None) -> logging.Logger: f'Logging dynamic per-module logging enabled (--lmodule={config.config["lmodule"]})' ) if config.config['logging_captures_prints']: - logger.debug('Logging will capture printed data as logger.info messages (--logging_captures_prints)') + logger.debug( + 'Logging will capture printed data as logger.info messages (--logging_captures_prints)' + ) return logger @@ -559,6 +599,7 @@ def tprint(*args, **kwargs) -> None: """ if config.config['logging_debug_threads']: from thread_utils import current_thread_id + print(f'{current_thread_id()}', end="") print(*args, **kwargs) else: @@ -582,50 +623,48 @@ class OutputMultiplexer(object): easy usage pattern. """ + class Destination(enum.IntEnum): """Bits in the destination_bitv bitvector. Used to indicate the output destination.""" - LOG_DEBUG = 0x01 # ⎫ - LOG_INFO = 0x02 # ⎪ - LOG_WARNING = 0x04 # ⎬ Must provide logger to the c'tor. - LOG_ERROR = 0x08 # ⎪ - LOG_CRITICAL = 0x10 # ⎭ - FILENAMES = 0x20 # Must provide a filename to the c'tor. - FILEHANDLES = 0x40 # Must provide a handle to the c'tor. + + LOG_DEBUG = 0x01 # ⎫ + LOG_INFO = 0x02 # ⎪ + LOG_WARNING = 0x04 # ⎬ Must provide logger to the c'tor. + LOG_ERROR = 0x08 # ⎪ + LOG_CRITICAL = 0x10 # ⎭ + FILENAMES = 0x20 # Must provide a filename to the c'tor. + FILEHANDLES = 0x40 # Must provide a handle to the c'tor. HLOG = 0x80 ALL_LOG_DESTINATIONS = ( LOG_DEBUG | LOG_INFO | LOG_WARNING | LOG_ERROR | LOG_CRITICAL ) ALL_OUTPUT_DESTINATIONS = 0x8F - def __init__(self, - destination_bitv: int, - *, - logger=None, - filenames: Optional[Iterable[str]] = None, - handles: Optional[Iterable[io.TextIOWrapper]] = None): + def __init__( + self, + destination_bitv: int, + *, + logger=None, + filenames: Optional[Iterable[str]] = None, + handles: Optional[Iterable[io.TextIOWrapper]] = None, + ): if logger is None: logger = logging.getLogger(None) self.logger = logger if filenames is not None: - self.f = [ - open(filename, 'wb', buffering=0) for filename in filenames - ] + self.f = [open(filename, 'wb', buffering=0) for filename in filenames] else: if destination_bitv & OutputMultiplexer.FILENAMES: - raise ValueError( - "Filenames argument is required if bitv & FILENAMES" - ) + raise ValueError("Filenames argument is required if bitv & FILENAMES") self.f = None if handles is not None: self.h = [handle for handle in handles] else: if destination_bitv & OutputMultiplexer.Destination.FILEHANDLES: - raise ValueError( - "Handle argument is required if bitv & FILEHANDLES" - ) + raise ValueError("Handle argument is required if bitv & FILEHANDLES") self.h = None self.set_destination_bitv(destination_bitv) @@ -635,17 +674,14 @@ class OutputMultiplexer(object): def set_destination_bitv(self, destination_bitv: int): if destination_bitv & self.Destination.FILENAMES and self.f is None: - raise ValueError( - "Filename argument is required if bitv & FILENAMES" - ) + raise ValueError("Filename argument is required if bitv & FILENAMES") if destination_bitv & self.Destination.FILEHANDLES and self.h is None: - raise ValueError( - "Handle argument is required if bitv & FILEHANDLES" - ) + raise ValueError("Handle argument is required if bitv & FILEHANDLES") self.destination_bitv = destination_bitv def print(self, *args, **kwargs): from string_utils import sprintf, strip_escape_sequences + end = kwargs.pop("end", None) if end is not None: if not isinstance(end, str): @@ -663,18 +699,12 @@ class OutputMultiplexer(object): end = "\n" if end == '\n': buf += '\n' - if ( - self.destination_bitv & self.Destination.FILENAMES and - self.f is not None - ): + if self.destination_bitv & self.Destination.FILENAMES and self.f is not None: for _ in self.f: _.write(buf.encode('utf-8')) _.flush() - if ( - self.destination_bitv & self.Destination.FILEHANDLES and - self.h is not None - ): + if self.destination_bitv & self.Destination.FILEHANDLES and self.h is not None: for _ in self.h: _.write(buf) _.flush() @@ -715,17 +745,18 @@ class OutputMultiplexerContext(OutputMultiplexer, contextlib.ContextDecorator): mplex.print("This is a log message!") """ - def __init__(self, - destination_bitv: OutputMultiplexer.Destination, - *, - logger = None, - filenames = None, - handles = None): + + def __init__( + self, + destination_bitv: OutputMultiplexer.Destination, + *, + logger=None, + filenames=None, + handles=None, + ): super().__init__( - destination_bitv, - logger=logger, - filenames=filenames, - handles=handles) + destination_bitv, logger=logger, filenames=filenames, handles=handles + ) def __enter__(self): return self @@ -751,4 +782,5 @@ def hlog(message: str) -> None: if __name__ == '__main__': import doctest + doctest.testmod() -- 2.45.2