entry_descr = None
try:
entry_filename = entry_point.__code__.co_filename
- entry_descr = entry_point.__code__.__repr__()
+ entry_descr = repr(entry_point.__code__)
except Exception:
if (
"__globals__" in entry_point.__dict__
self.root = {}
self.end = "~END~"
self.length = 0
- self.viz = ''
+ self.viz = ""
self.content_generator: Generator[str] = None
def insert(self, item: Sequence[Any]) -> None:
ends.
Args:
- root_node: root under which to search for item
+ node: root under which to search for item
item: item whose node is the root of the recursive deletion operation
Returns:
return self.length
def __iter__(self):
- self.content_generator = self.generate_recursively(self.root, '')
+ self.content_generator = self.generate_recursively(self.root, "")
return self
def generate_recursively(self, node, path: Sequence[Any]):
:meth:`__repr__`.
"""
if node is None:
- return ''
+ return ""
if node is not self.root:
- ret = f'\n{padding}{pointer}'
+ ret = f"\n{padding}{pointer}"
if has_sibling:
- padding += '│ '
+ padding += "│ "
else:
- padding += ' '
+ padding += " "
else:
- ret = f'{pointer}'
+ ret = f"{pointer}"
child_count = 0
for child in node:
else:
pointer = "└──"
has_sibling = False
- pointer += f'{child}'
+ pointer += f"{child}"
child_count -= 1
ret += self._repr_fancy(padding, pointer, node[child], has_sibling)
return ret
"""
child_count = 0
- my_rep = ''
+ my_rep = ""
for child in node:
if child != self.end:
child_count += 1
if len(my_rep) > 1:
my_rep = my_rep[:-1]
if child_count > 1:
- my_rep = f'[{my_rep}]'
+ my_rep = f"[{my_rep}]"
return my_rep
def __repr__(self):
└──2
"""
- return self._repr_fancy('', '*', self.root, False)
+ return self._repr_fancy("", "*", self.root, False)
-if __name__ == '__main__':
+if __name__ == "__main__":
import doctest
doctest.testmod()
special_characters = BiDict(
{
- ' ': 27,
- '.': 28,
- ',': 29,
+ " ": 27,
+ ".": 28,
+ ",": 29,
"-": 30,
'"': 31,
}
"""
compressed = bitstring.BitArray()
for letter in uncompressed:
- if 'a' <= letter <= 'z':
- bits = ord(letter) - ord('a') + 1 # 1..26
+ if "a" <= letter <= "z":
+ bits = ord(letter) - ord("a") + 1 # 1..26
else:
if letter not in special_characters:
raise Exception(
'scott'
"""
- decompressed = ''
+ decompressed = ""
kompressed = bitstring.BitArray(compressed)
# There are compressed messages that legitimately end with the
chunk = chunk.uint
if chunk == 0:
break
- elif 1 <= chunk <= 26:
- letter = chr(chunk - 1 + ord('a'))
+
+ if 1 <= chunk <= 26:
+ letter = chr(chunk - 1 + ord("a"))
else:
letter = special_characters.inverse[chunk][0]
decompressed += letter
return decompressed
-if __name__ == '__main__':
+if __name__ == "__main__":
import doctest
doctest.testmod()
# when the user passes -h or --help, it will be visible on the
# screen w/o scrolling. This just makes for a nicer --help screen.
for arg in sys.argv:
- if arg in ("--help", "-h"):
+ if arg in {"--help", "-h"}:
if entry_module is not None:
entry_module = os.path.basename(entry_module)
ARGS._action_groups = Config._reorder_arg_action_groups_before_help(
#!/usr/bin/env python3
# type: ignore
-# pylint: disable=W0201
-# pylint: disable=R0904
+# pylint: disable=too-many-public-methods
+# pylint: disable=too-many-instance-attributes
# © Copyright 2021-2023, Scott Gasch
"""
return self.time
- def get_datetime(self, *, tz=None) -> Optional[datetime.datetime]:
+ def get_datetime(
+ self, *, tz: Optional[datetime.tzinfo] = None
+ ) -> Optional[datetime.datetime]:
"""Get the datetime of the last :meth:`parse` operation again
ot None.
name = DateParser._normalize_special_day_name(self.context['special'])
# Yesterday, today, tomorrow -- ignore any next/last
- if name in ('today', 'now'):
+ if name in {'today', 'now'}:
return today
if name == 'yeste':
return today + datetime.timedelta(days=-1)
# Try constructing an offset in seconds
try:
txt_sign = txt[0]
- if txt_sign in ('-', '+'):
+ if txt_sign in {'-', '+'}:
sign = +1 if txt_sign == '+' else -1
hour = int(txt[1:3])
minute = int(txt[-2:])
# Adjust count's sign based on the presence of 'before' or 'after'.
if 'delta_before_after' in self.context:
before_after = self.context['delta_before_after'].lower()
- if before_after in ('before', 'until', 'til', 'to'):
+ if before_after in {'before', 'until', 'til', 'to'}:
count = -count
# What are we counting units of?
# Adjust count's sign based on the presence of 'before' or 'after'.
if 'time_delta_before_after' in self.context:
before_after = self.context['time_delta_before_after'].lower()
- if before_after in ('before', 'until', 'til', 'to'):
+ if before_after in {'before', 'until', 'til', 'to'}:
count = -count
# What are we counting units of... assume minutes.
except Exception as e:
raise ParseException(f'Bad special time expression: {ctx.getText()}') from e
else:
- if txt in ('noon', 'midday'):
+ if txt in {'noon', 'midday'}:
self.context['hour'] = 12
self.context['minute'] = 0
self.context['seconds'] = 0
def get_format_string(
*,
- date_time_separator=" ",
- include_timezone=True,
- include_dayname=False,
- use_month_abbrevs=False,
- include_seconds=True,
- include_fractional=False,
- twelve_hour=True,
+ date_time_separator: str = " ",
+ include_timezone: bool = True,
+ include_dayname: bool = False,
+ use_month_abbrevs: bool = False,
+ include_seconds: bool = True,
+ include_fractional: bool = False,
+ twelve_hour: bool = True,
) -> str:
"""
Helper to return a format string without looking up the documentation
def datetime_to_string(
dt: datetime.datetime,
*,
- date_time_separator=" ",
- include_timezone=True,
- include_dayname=False,
- use_month_abbrevs=False,
- include_seconds=True,
- include_fractional=False,
- twelve_hour=True,
+ date_time_separator: str = " ",
+ include_timezone: bool = True,
+ include_dayname: bool = False,
+ use_month_abbrevs: bool = False,
+ include_seconds: bool = True,
+ include_fractional: bool = False,
+ twelve_hour: bool = True,
) -> str:
"""
A nice way to convert a datetime into a string; arguably better than
def string_to_datetime(
txt: str,
*,
- date_time_separator=" ",
- include_timezone=True,
- include_dayname=False,
- use_month_abbrevs=False,
- include_seconds=True,
- include_fractional=False,
- twelve_hour=True,
+ date_time_separator: str = " ",
+ include_timezone: bool = True,
+ include_dayname: bool = False,
+ use_month_abbrevs: bool = False,
+ include_seconds: bool = True,
+ include_fractional: bool = False,
+ twelve_hour: bool = True,
) -> Tuple[datetime.datetime, str]:
"""A nice way to convert a string into a datetime. Returns both the
datetime and the format string used to parse it. Also consider
def time_to_string(
dt: datetime.datetime,
*,
- include_seconds=True,
- include_fractional=False,
- include_timezone=False,
- twelve_hour=True,
+ include_seconds: bool = True,
+ include_fractional: bool = False,
+ include_timezone: bool = False,
+ twelve_hour: bool = True,
) -> str:
"""A nice way to convert a datetime into a time (only) string.
This ignores the date part of the datetime completely.
return f"{hour:2}:{minute:02}{ampm}"
-def parse_duration(duration: str, raise_on_error=False) -> int:
+def parse_duration(duration: str, raise_on_error: bool = False) -> int:
"""
Parse a duration in string form into a delta seconds.
return seconds
-def describe_duration(seconds: int, *, include_seconds=False) -> str:
+def describe_duration(seconds: int, *, include_seconds: bool = False) -> str:
"""
Describe a duration represented as a count of seconds nicely.
return describe_duration(int(delta.total_seconds())) # Note: drops milliseconds
-def describe_duration_briefly(seconds: int, *, include_seconds=False) -> str:
+def describe_duration_briefly(seconds: int, *, include_seconds: bool = False) -> str:
"""
Describe a duration briefly.
def describe_timedelta_briefly(
- delta: datetime.timedelta, *, include_seconds=False
+ delta: datetime.timedelta, *, include_seconds: bool = False
) -> str:
"""
Describe a duration represented by a timedelta object.
Args:
delta: the timedelta to describe briefly
+ include_seconds: should we include the second delta?
Returns:
A string description of the input timedelta object.
EASTER_WESTERN = 3
-def easter(year, method=EASTER_WESTERN):
+def easter(year: int, method: int = EASTER_WESTERN):
"""
This method was ported from the work done by GM Arts,
on top of the algorithm by Claus Tondering, which was
import time
import traceback
import warnings
-from typing import Any, Callable, List, Optional
+from typing import Any, Callable, List, Optional, Union
# This module is commonly used by others in here and should avoid
# taking any unnecessary dependencies back on them.
wait_time = min_interval_seconds - elapsed_since_last
else:
wait_time = 0.0
- logger.debug('@%.4f> wait_time = %.4f', time.time(), wait_time)
+ logger.debug("@%.4f> wait_time = %.4f", time.time(), wait_time)
return wait_time
def wrapper_wrapper_rate_limited(*args, **kargs) -> Any:
):
break
with cv:
- logger.debug('@%.4f> calling it...', time.time())
+ logger.debug("@%.4f> calling it...", time.time())
ret = func(*args, **kargs)
last_invocation_timestamp[0] = time.time()
logger.debug(
- '@%.4f> Last invocation <- %.4f',
+ "@%.4f> Last invocation <- %.4f",
time.time(),
last_invocation_timestamp[0],
)
def __call__(self, *args, **kwargs):
"""Returns a single instance of decorated class"""
logger.debug(
- '@singleton returning global instance of %s', self.__wrapped__.__name__
+ "@singleton returning global instance of %s", self.__wrapped__.__name__
)
if self._instance is None:
self._instance = self.__wrapped__(*args, **kwargs)
cache_key = args + tuple(kwargs.items())
if cache_key not in wrapper_memoized.cache:
value = func(*args, **kwargs)
- logger.debug('Memoizing %s => %s for %s', cache_key, value, func.__name__)
+ logger.debug("Memoizing %s => %s for %s", cache_key, value, func.__name__)
wrapper_memoized.cache[cache_key] = value
else:
- logger.debug('Returning memoized value for %s', {func.__name__})
+ logger.debug("Returning memoized value for %s", {func.__name__})
return wrapper_memoized.cache[cache_key]
wrapper_memoized.cache = {} # type: ignore
@functools.wraps(f)
def f_retry(*args, **kwargs):
mtries, mdelay = tries, delay_sec # make mutable
- logger.debug('deco_retry: will make up to %d attempts...', mtries)
+ logger.debug("deco_retry: will make up to %d attempts...", mtries)
retval = f(*args, **kwargs)
while mtries > 0:
if predicate(retval) is True:
- logger.debug('Predicate succeeded, deco_retry is done.')
+ logger.debug("Predicate succeeded, deco_retry is done.")
return retval
logger.debug("Predicate failed, sleeping and retrying.")
mtries -= 1
return deco_retry
-def retry_if_false(tries: int, *, delay_sec=3.0, backoff=2.0):
+def retry_if_false(tries: int, *, delay_sec: float = 3.0, backoff: float = 2.0):
"""A helper for `@predicated_retry_with_backoff` that retries a
decorated function as long as it keeps returning False.
)
-def retry_if_none(tries: int, *, delay_sec=3.0, backoff=2.0):
+def retry_if_none(tries: int, *, delay_sec: float = 3.0, backoff: float = 2.0):
"""A helper for `@predicated_retry_with_backoff` that continues to
invoke the wrapped function as long as it keeps returning None.
Retries up to N times with a delay between each retry and a
return decorate
-def synchronized(lock):
+def synchronized(lock: Union[threading.Lock, threading.RLock]):
"""Emulates java's "synchronized" keyword: given a lock, require
that threads take that lock (or wait) before invoking the wrapped
function and automatically releases the lock afterwards.
return decorator
-def decorate_matching_methods_with(decorator, acl=None):
+def decorate_matching_methods_with(decorator: Callable, acl: Optional[Callable] = None):
"""Apply the given decorator to all methods in a class whose names
begin with prefix. If prefix is None (default), decorate all
methods in the class.
def decorate_the_class(cls):
for name, m in inspect.getmembers(cls, inspect.isfunction):
- if acl is None:
+ if acl is None or acl(name):
setattr(cls, name, decorator(m))
- else:
- if acl(name):
- setattr(cls, name, decorator(m))
return cls
return decorate_the_class
-if __name__ == '__main__':
+if __name__ == "__main__":
import doctest
doctest.testmod()
long.
Args:
- command: the command to run timeout_seconds: the optional
- max number of seconds to allow the subprocess to execute or
- None to indicate no timeout
+ command: the command to run.
+ timeout_seconds: the optional max number of seconds to allow
+ the subprocess to execute or None (default) to indicate no
+ time limit.
Returns:
No return value; error conditions (including non-zero child process
for direntry in os.scandir(self.directory):
if direntry.is_file(follow_symlinks=True):
mtime = direntry.stat(follow_symlinks=True).st_mtime
- path = f'{self.directory}/{direntry.name}'
+ path = f"{self.directory}/{direntry.name}"
self._update_file(path, mtime)
def _update_file(self, filename: str, mtime: Optional[float] = None):
if self.mtime_by_filename.get(filename, 0) != mtime:
md5 = file_utils.get_file_md5(filename)
logger.debug(
- 'Computed/stored %s\'s MD5 at ts=%.2f (%s)', filename, mtime, md5
+ "Computed/stored %s's MD5 at ts=%.2f (%s)", filename, mtime, md5
)
self.mtime_by_filename[filename] = mtime
self.md5_by_filename[filename] = md5
"""
self._update_file(filename)
file_md5 = self.md5_by_filename.get(filename, 0)
- logger.debug('%s\'s checksum is %s', filename, file_md5)
+ logger.debug("%s's checksum is %s", filename, file_md5)
mem_hash = hashlib.md5()
mem_hash.update(proposed_contents)
md5 = mem_hash.hexdigest()
- logger.debug('Item\'s checksum is %s', md5)
+ logger.debug("Item's checksum is %s", md5)
return md5 != file_md5
self.md5_by_filename[filename] = md5
self.all_md5s.add(md5)
- def apply(self, proposed_contents: Any, ignored_filename: str = None) -> bool:
+ def apply(
+ self, proposed_contents: Any, ignored_filename: Optional[str] = None
+ ) -> bool:
"""Call this before writing a new file to directory with the
proposed_contents to be written and it will return a value that
indicates whether the identical contents is already sitting in
return md5 not in self.all_md5s
-if __name__ == '__main__':
+if __name__ == "__main__":
import doctest
doctest.testmod()
def slurp_file(
filename: str,
*,
- skip_blank_lines=False,
+ skip_blank_lines: bool = False,
line_transformers: Optional[List[Callable[[str], str]]] = None,
):
"""Reads in a file's contents line-by-line to a memory buffer applying
return os.path.realpath(filespec)
-def create_path_if_not_exist(path, on_error=None) -> None:
+def create_path_if_not_exist(
+ path: str, on_error: Callable[[str, OSError], None] = None
+) -> None:
"""
Attempts to create path if it does not exist already.
Args:
path: the path to attempt to create
- on_error: If True, it's invoked on error conditions. Otherwise
- any exceptions are raised.
+ on_error: If set, it's invoked on error conditions and passed then
+ path and OSError that it caused.
See also :meth:`does_file_exist`.
def _convert_file_timestamp_to_datetime(
- filename: str, producer
+ filename: str,
+ producer: Callable[[str], Optional[float]],
) -> Optional[datetime.datetime]:
"""
Converts a raw file timestamp into a Python datetime.
Args:
filename: file whose timestamps should be converted.
producer: source of the timestamp.
-
Returns:
The datetime.
"""
return describe_duration(age)
-def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
+def describe_file_atime(filename: str, *, brief: bool = False) -> Optional[str]:
"""
Describe how long ago a file was accessed.
return describe_file_timestamp(filename, lambda x: x.st_atime, brief=brief)
-def describe_file_ctime(filename: str, *, brief=False) -> Optional[str]:
+def describe_file_ctime(filename: str, *, brief: bool = False) -> Optional[str]:
"""Describes a file's creation time.
Args:
return describe_file_timestamp(filename, lambda x: x.st_ctime, brief=brief)
-def describe_file_mtime(filename: str, *, brief=False) -> Optional[str]:
+def describe_file_mtime(filename: str, *, brief: bool = False) -> Optional[str]:
"""Describes how long ago a file was modified.
Args:
yield full_path
-def get_matching_files(directory: str, glob: str):
+def get_matching_files(directory: str, glob_string: str):
"""
Returns the subset of files whose name matches a glob.
Args:
directory: the directory to match files within.
- glob: the globbing pattern (may include '*' and '?') to
+ glob_string: the globbing pattern (may include '*' and '?') to
use when matching files.
Returns:
See also :meth:`get_files`, :meth:`expand_globs`.
"""
for filename in get_files(directory):
- if fnmatch.fnmatch(filename, glob):
+ if fnmatch.fnmatch(filename, glob_string):
yield filename
yield file_or_directory
-def get_matching_files_recursive(directory: str, glob: str):
- """
- Returns the subset of files whose name matches a glob under a root recursively.
+def get_matching_files_recursive(directory: str, glob_string: str):
+ """Returns the subset of files whose name matches a glob under a root recursively.
Args:
directory: the root under which to search
- glob: a globbing pattern that describes the subset of files and directories
- to return. May contain '?' and '*'.
+ glob_string: a globbing pattern that describes the subset of
+ files and directories to return. May contain '?' and '*'.
Returns:
A generator that yields all files and directories under the given root
directory that match the given globbing pattern.
See also :meth:`get_files_recursive`.
+
"""
for filename in get_files_recursive(directory):
- if fnmatch.fnmatch(filename, glob):
+ if fnmatch.fnmatch(filename, glob_string):
yield filename
*,
initial_delay: float = 1.0,
backoff_factor: float = 2.0,
- max_attempts=5,
+ max_attempts: int = 5,
) -> bool:
"""Attempt to acquire the lock repeatedly with retries and backoffs.
if self.locktime:
ts = datetime.datetime.now().timestamp()
duration = ts - self.locktime
- if (
- duration
- >= config.config[
- "lockfile_held_duration_warning_threshold"
- ].total_seconds()
- ):
- # Note: describe duration briefly only does 1s granularity...
+ warning_threshold = config.config[
+ "lockfile_held_duration_warning_threshold"
+ ]
+ assert warning_threshold
+ if duration >= warning_threshold.total_seconds():
+ # Note: describe duration briefly only does second-level granularity...
str_duration = datetime_utils.describe_duration_briefly(int(duration))
msg = f"Held {self.lockfile} for {str_duration}"
logger.warning(msg)
if self.is_locked:
self.release()
- def _signal(self, *args):
+ def _signal(self, *unused_args):
if self.is_locked:
self.release()
unvisited_nodes.remove(current_min_node)
self.dijkstra = (source, previous_nodes, shortest_path)
- def minimum_path_between(self, source: str, dest: str) -> Tuple[Numeric, List[str]]:
+ def minimum_path_between(
+ self, source: str, dest: str
+ ) -> Tuple[Optional[Numeric], List[str]]:
"""Compute the minimum path (lowest cost path) between source
and dest.
assert self.dijkstra
path = []
- node = dest
+ node: Optional[str] = dest
while node != source:
+ assert node
path.append(node)
node = self.dijkstra[1].get(node, None)
if node is None:
generators = {}
-def get(name: str, *, start=0) -> int:
+def get(name: str, *, start: int = 0) -> int:
"""
Returns a thread-safe, monotonically increasing id suitable for use
as a globally unique identifier.
if len(self.on_deck) > 0:
return self.on_deck[0]
try:
- item = self.source_iter.__next__()
+ item = next(self.source_iter)
self.on_deck.append(item)
return self.peek()
except StopIteration:
return self.resovoir
-if __name__ == '__main__':
+if __name__ == "__main__":
import doctest
doctest.testmod()
return Counter(lst)
-def most_common(lst: List[Any], *, count=1) -> Any:
+def most_common(lst: List[Any], *, count: int = 1) -> Any:
"""
Return the N most common item in the list.
return remove_list_if_one_element([_[0] for _ in p.most_common()[0:count]])
-def least_common(lst: List[Any], *, count=1) -> Any:
+def least_common(lst: List[Any], *, count: int = 1) -> Any:
"""
Return the N least common item in the list.
return chain.from_iterable(combinations(seq, r) for r in range(len(seq) + 1))
-if __name__ == '__main__':
+if __name__ == "__main__":
import doctest
doctest.testmod()
"""
if len(lst) <= 0:
raise ValueError("Need at least one number")
- elif len(lst) == 1:
+ if len(lst) == 1:
return lst[0]
assert len(lst) >= 2
gcd = gcd_floats(lst[0], lst[1])
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
+# pylint: disable=too-many-instance-attributes
+# pylint: disable=too-many-nested-blocks
# © Copyright 2021-2023, Scott Gasch
base_score = 0
for record in self.workers:
if worker.machine == record.machine:
- base_score = float(record.weight)
- base_score = 1.0 / base_score
- base_score *= 200.0
- base_score = int(base_score)
+ temp_score = float(record.weight)
+ temp_score = 1.0 / temp_score
+ temp_score *= 200.0
+ base_score = int(temp_score)
break
for uuid in bundle_uuids:
RemoteWorkerPoolProvider, persistent.JsonFileBasedPersistent
):
def __init__(self, json_remote_worker_pool: Dict[str, Any]):
- self.remote_worker_pool = []
+ self.remote_worker_pool: List[RemoteWorkerRecord] = []
for record in json_remote_worker_pool['remote_worker_records']:
self.remote_worker_pool.append(
dataclass_utils.dataclass_from_dict(RemoteWorkerRecord, record)
ppid = os.getppid()
pid = os.getpid()
tid = threading.current_thread().name
- return f'{ppid}/{pid}/{tid}:'
+ return f"{ppid}/{pid}/{tid}:"
def is_current_thread_main_thread() -> bool:
"""
def __init__(
- self, group=None, target=None, name=None, args=(), kwargs={}, Verbose=None
+ self, group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None
):
threading.Thread.__init__(
- self, group=None, target=target, name=None, args=args, kwargs=kwargs
+ self,
+ group=None,
+ target=target,
+ name=None,
+ args=args,
+ kwargs=kwargs,
+ daemon=daemon,
)
self._target = target
self._return = None
+ self._args = args
+ self._kwargs = kwargs
def run(self) -> None:
"""Create a little wrapper around invoking the real thread entry
return decorator_repeat
-if __name__ == '__main__':
+if __name__ == "__main__":
import doctest
doctest.testmod()
"""Remote worker entry point."""
in_file = config.config["code_file"]
- assert in_file and type(in_file) == str
+ assert in_file and isinstance(in_file, str)
out_file = config.config["result_file"]
- assert out_file and type(out_file) == str
+ assert out_file and isinstance(out_file, str)
thread = None
stop_event = None
#!/usr/bin/env python3
+# pylint: disable=too-many-nested-blocks
# © Copyright 2021-2023, Scott Gasch
class Document:
"""A class representing a searchable document."""
- docid: str = ''
+ docid: str = ""
"""A unique identifier for each document -- must be provided
by the caller. See :meth:`python_modules.id_generator.get` or
:meth:`python_modules.string_utils.generate_uuid` for potential
f'Invalid key:value syntax at "{tag}"'
) from v
- if key == '*':
+ if key == "*":
r = set()
for kv, s in self.corpus.docids_by_property.items():
- if value in ('*', kv[1]):
+ if value in ("*", kv[1]):
r.update(s)
else:
- if value == '*':
+ if value == "*":
r = self.corpus.get_docids_with_property(key)
else:
r = self.corpus.get_docids_by_property(key, value)
else:
- if tag == '*':
+ if tag == "*":
r = set()
for s in self.corpus.docids_by_tag.values():
r.update(s)
return retval
-if __name__ == '__main__':
+if __name__ == "__main__":
import doctest
doctest.testmod()
"""
from pyutils.exec_utils import cmd
- rows: Union[Optional[str], int] = os.environ.get('LINES', None)
- cols: Union[Optional[str], int] = os.environ.get('COLUMNS', None)
+ rows: Union[Optional[str], int] = os.environ.get("LINES", None)
+ cols: Union[Optional[str], int] = os.environ.get("COLUMNS", None)
if not rows or not cols:
try:
size = os.get_terminal_size()
cols = None
if not rows or not cols:
- logger.debug('Rows: %s, cols: %s, trying stty.', rows, cols)
+ logger.debug("Rows: %s, cols: %s, trying stty.", rows, cols)
try:
rows, cols = cmd(
"stty size",
cols = None
if not rows or not cols:
- raise Exception('Can\'t determine console size?!')
+ raise Exception("Can't determine console size?!")
return RowsColumns(int(rows), int(cols))
current: int,
total: int,
*,
- width=70,
+ width: int = 70,
text: BarGraphText = BarGraphText.PERCENTAGE,
- fgcolor=fg("school bus yellow"),
- left_end="[",
- right_end="]",
- redraw=True,
+ fgcolor: str = fg("school bus yellow"),
+ left_end: str = "[",
+ right_end: str = "]",
+ redraw: bool = True,
) -> None:
"""Draws a progress graph at the current cursor position.
if text == BarGraphText.NONE:
return ""
elif text == BarGraphText.PERCENTAGE:
- return f'{percentage:.1f}'
+ return f"{percentage:.1f}"
elif text == BarGraphText.FRACTION:
- return f'{current} / {total}'
+ return f"{current} / {total}"
raise ValueError(text)
total: int,
*,
text: BarGraphText = BarGraphText.PERCENTAGE,
- width=70,
- fgcolor=fg("school bus yellow"),
- reset_seq=reset(),
- left_end="[",
- right_end="]",
+ width: int = 70,
+ fgcolor: str = fg("school bus yellow"),
+ reset_seq: str = reset(),
+ left_end: str = "[",
+ right_end: str = "]",
) -> str:
"""Returns a string containing a bar graph.
percentage = 0.0
if percentage < 0.0 or percentage > 1.0:
raise ValueError(percentage)
- text = _make_bar_graph_text(text, current, total, percentage)
+ txt = _make_bar_graph_text(text, current, total, percentage)
whole_width = math.floor(percentage * width)
if whole_width == width:
whole_width -= 1
+ reset_seq
+ right_end
+ " "
- + text
+ + txt
)
(73, 104, '█▇▆▆▃▂▄▁')
"""
- _bar = '▁▂▃▄▅▆▇█' # Unicode: 9601, 9602, 9603, 9604, 9605, 9606, 9607, 9608
+ _bar = "▁▂▃▄▅▆▇█" # Unicode: 9601, 9602, 9603, 9604, 9605, 9606, 9607, 9608
barcount = len(_bar)
min_num, max_num = min(numbers), max(numbers)
span = max_num - min_num
- sline = ''.join(
+ sline = "".join(
_bar[min([barcount - 1, int((n - min_num) / span * barcount)])] for n in numbers
)
return min_num, max_num, sline
>>> distribute_strings(['this', 'is', 'a', 'test'], width=40)
' this is a test '
"""
- ret = ' ' + ' '.join(strings) + ' '
+ ret = " " + " ".join(strings) + " "
assert len(string_utils.strip_ansi_sequences(ret)) < width
x = 0
while len(string_utils.strip_ansi_sequences(ret)) < width:
- spaces = [m.start() for m in re.finditer(r' ([^ ]|$)', ret)]
+ spaces = [m.start() for m in re.finditer(r" ([^ ]|$)", ret)]
where = spaces[x]
before = ret[:where]
after = ret[where:]
'This is a test of the emergency\\nbroadcast system. This is only a test.'
"""
- retval = ''
- indent = ''
+ retval = ""
+ indent = ""
if indent_by > 0:
- indent += ' ' * indent_by
+ indent += " " * indent_by
line = indent
for word in text.split():
) > width:
line = line[1:]
line = justify_string(line, width=width, alignment=alignment)
- retval = retval + '\n' + line
+ retval = retval + "\n" + line
line = indent
- line = line + ' ' + word
+ line = line + " " + word
if len(string_utils.strip_ansi_sequences(line)) > 0:
- if alignment != 'j':
+ if alignment != "j":
retval += "\n" + justify_string(line[1:], width=width, alignment=alignment)
else:
retval += "\n" + line[1:]
out = ""
for pos, word in enumerate(line.split()):
width = max_width[pos]
- word = justify_string(word, width=width, alignment='l')
- out += f'{word} '
+ word = justify_string(word, width=width, alignment="l")
+ out += f"{word} "
yield out
The wrapped form of text
"""
chunks = text.split()
- out = ''
+ out = ""
width = 0
for chunk in chunks:
if width + len(string_utils.strip_ansi_sequences(chunk)) > n:
- out += '\n'
+ out += "\n"
width = 0
- out += chunk + ' '
+ out += chunk + " "
width += len(string_utils.strip_ansi_sequences(chunk)) + 1
return out
self,
*,
pad_prefix: Optional[str] = None,
- pad_char: str = ' ',
+ pad_char: str = " ",
pad_count: int = 4,
):
"""Construct an Indenter.
if pad_prefix is not None:
self.pad_prefix = pad_prefix
else:
- self.pad_prefix = ''
+ self.pad_prefix = ""
self.padding = pad_char * pad_count
def __enter__(self):
return False
def print(self, *arg, **kwargs):
- text = string_utils.sprintf(*arg, **kwargs)
- print(self.pad_prefix + self.padding * self.level + text, end='')
+ text = string_utils._sprintf(*arg, **kwargs)
+ print(self.pad_prefix + self.padding * self.level + text, end="")
def header(
*,
width: Optional[int] = None,
align: Optional[str] = None,
- style: Optional[str] = 'solid',
+ style: Optional[str] = "solid",
color: Optional[str] = None,
):
"""
width: how wide to make the header
align: "left" or "right"
style: "ascii", "solid" or "dashed"
+ color: what color to use, if any
Returns:
The header as a string.
except Exception:
width = 80
if not align:
- align = 'left'
+ align = "left"
if not style:
- style = 'ascii'
+ style = "ascii"
text_len = len(string_utils.strip_ansi_sequences(title))
- if align == 'left':
+ if align == "left":
left = 4
right = width - (left + text_len + 4)
- elif align == 'right':
+ elif align == "right":
right = 4
left = width - (right + text_len + 4)
else:
while left + text_len + 4 + right < width:
right += 1
- if style == 'solid':
- line_char = '━'
- begin = ''
- end = ''
- elif style == 'dashed':
- line_char = '┅'
- begin = ''
- end = ''
+ if style == "solid":
+ line_char = "━"
+ begin = ""
+ end = ""
+ elif style == "dashed":
+ line_char = "┅"
+ begin = ""
+ end = ""
else:
- line_char = '-'
- begin = '['
- end = ']'
+ line_char = "-"
+ begin = "["
+ end = "]"
if color:
col = color
reset_seq = reset()
else:
- col = ''
- reset_seq = ''
+ col = ""
+ reset_seq = ""
return (
line_char * left
+ begin
+ col
- + ' '
+ + " "
+ title
- + ' '
+ + " "
+ reset_seq
+ end
+ line_char * right
text: Optional[str] = None,
*,
width: int = 80,
- color: str = '',
+ color: str = "",
) -> str:
"""
Make a nice unicode box (optionally with color) around some text.
"""
assert width > 4
if text is not None:
- text = justify_text(text, width=width - 4, alignment='l')
+ text = justify_text(text, width=width - 4, alignment="l")
return preformatted_box(title, text, width=width, color=color)
title: Optional[str] = None,
text: Optional[str] = None,
*,
- width=80,
- color: str = '',
+ width: int = 80,
+ color: str = "",
) -> str:
"""Creates a nice box with rounded corners and returns it as a string.
╰──────────────────╯
"""
assert width > 4
- ret = ''
- if color == '':
- rset = ''
+ ret = ""
+ if color == "":
+ rset = ""
else:
rset = reset()
w = width - 2
- ret += color + '╭' + '─' * w + '╮' + rset + '\n'
+ ret += color + "╭" + "─" * w + "╮" + rset + "\n"
if title is not None:
ret += (
color
- + '│'
+ + "│"
+ rset
- + justify_string(title, width=w, alignment='c')
+ + justify_string(title, width=w, alignment="c")
+ color
- + '│'
+ + "│"
+ rset
- + '\n'
+ + "\n"
)
- ret += color + '│' + ' ' * w + '│' + rset + '\n'
+ ret += color + "│" + " " * w + "│" + rset + "\n"
if text is not None:
- for line in text.split('\n'):
+ for line in text.split("\n"):
tw = len(string_utils.strip_ansi_sequences(line))
assert tw <= w
ret += (
color
- + '│ '
+ + "│ "
+ rset
+ line
- + ' ' * (w - tw - 2)
+ + " " * (w - tw - 2)
+ color
- + ' │'
+ + " │"
+ rset
- + '\n'
+ + "\n"
)
- ret += color + '╰' + '─' * w + '╯' + rset + '\n'
+ ret += color + "╰" + "─" * w + "╯" + rset + "\n"
return ret
text: Optional[str] = None,
*,
width: int = 80,
- color: str = '',
+ color: str = "",
) -> None:
"""Draws a box with nice rounded corners.
│ OK │
╰────╯
"""
- print(preformatted_box(title, text, width=width, color=color), end='')
+ print(preformatted_box(title, text, width=width, color=color), end="")
-if __name__ == '__main__':
+if __name__ == "__main__":
import doctest
doctest.testmod()
def __repr__(self):
q = Decimal(10) ** -2
- sign, digits, exp = self.amount.quantize(q).as_tuple()
+ sign, digits, _ = self.amount.quantize(q).as_tuple()
result = []
digits = list(map(str, digits))
- build, next = result.append, digits.pop
+ build, nxt = result.append, digits.pop
for i in range(2):
- build(next() if digits else "0")
+ build(nxt() if digits else "0")
build(".")
if not digits:
build("0")
i = 0
while digits:
- build(next())
+ build(nxt())
i += 1
if i == 3 and digits:
i = 0
return wrapper_perf_monitor
-def check_all_methods_for_perf_regressions(prefix="test_"):
+def check_all_methods_for_perf_regressions(prefix: str = "test_"):
"""This decorator is meant to apply to classes that subclass from
:class:`unittest.TestCase` and, when applied, has the affect of
decorating each method that matches the `prefix` given with the
if indexfile is None:
if "unscrambler_default_indexfile" in config.config:
indexfile = config.config["unscrambler_default_indexfile"]
- assert type(indexfile) == str
+ assert isinstance(indexfile, str)
else:
indexfile = "/usr/share/dict/sparse_index"
else: