From: Scott Gasch Date: Thu, 9 Sep 2021 06:29:05 +0000 (-0700) Subject: Add doctests to some of this stuff. X-Git-Url: https://wannabe.guru.org/gitweb/?a=commitdiff_plain;h=709370b2198e09f1dbe195fe8813602a3125b7f6;p=python_utils.git Add doctests to some of this stuff. --- diff --git a/dict_utils.py b/dict_utils.py index 74e8fda..6dd79f3 100644 --- a/dict_utils.py +++ b/dict_utils.py @@ -11,6 +11,21 @@ def init_or_inc( init_value: Any = 1, inc_function: Callable[..., Any] = lambda x: x + 1 ) -> bool: + """ + Initialize a dict value (if it doesn't exist) or increments it (using the + inc_function, which is customizable) if it already does exist. Returns + True if the key already existed or False otherwise. + + >>> d = {} + >>> init_or_inc(d, "test") + False + >>> init_or_inc(d, "test") + True + >>> init_or_inc(d, 'ing') + False + >>> d + {'test': 2, 'ing': 1} + """ if key in d.keys(): d[key] = inc_function(d[key]) return True @@ -19,6 +34,10 @@ def init_or_inc( def shard(d: Dict[Any, Any], size: int) -> Iterator[Dict[Any, Any]]: + """ + Shards a dict into N subdicts which, together, contain all keys/values + from the original unsharded dict. + """ items = d.items() for x in range(0, len(d), size): yield {key: value for (key, value) in islice(items, x, x + size)} @@ -42,11 +61,22 @@ def coalesce( *, aggregation_function: Callable[[Any, Any], Any] = coalesce_by_creating_list ) -> Dict[Any, Any]: + """Merge N dicts into one dict containing the union of all keys/values in + the input dicts. When keys collide, apply the aggregation_function which, + by default, creates a list of values. See also coalesce_by_creating_set or + provide a user defined aggregation_function. + + >>> a = {'a': 1, 'b': 2} + >>> b = {'b': 1, 'c': 2, 'd': 3} + >>> c = {'c': 1, 'd': 2} + >>> coalesce([a, b, c]) + {'a': 1, 'b': [1, 2], 'c': [1, 2], 'd': [2, 3]} + """ out: Dict[Any, Any] = {} for d in inputs: for key in d: if key in out: - value = aggregation_function(d[key], out[key]) + value = aggregation_function(key, d[key], out[key]) else: value = d[key] out[key] = value @@ -54,32 +84,89 @@ def coalesce( def item_with_max_value(d: Dict[Any, Any]) -> Tuple[Any, Any]: + """Returns the key and value with the max value in a dict. + + >>> d = {'a': 1, 'b': 2, 'c': 3} + >>> item_with_max_value(d) + ('c', 3) + >>> item_with_max_value({}) + Traceback (most recent call last): + ... + ValueError: max() arg is an empty sequence + """ return max(d.items(), key=lambda _: _[1]) def item_with_min_value(d: Dict[Any, Any]) -> Tuple[Any, Any]: + """Returns the key and value with the min value in a dict. + + >>> d = {'a': 1, 'b': 2, 'c': 3} + >>> item_with_min_value(d) + ('a', 1) + """ return min(d.items(), key=lambda _: _[1]) def key_with_max_value(d: Dict[Any, Any]) -> Any: + """Returns the key with the max value in the dict. + + >>> d = {'a': 1, 'b': 2, 'c': 3} + >>> key_with_max_value(d) + 'c' + """ return item_with_max_value(d)[0] def key_with_min_value(d: Dict[Any, Any]) -> Any: + """Returns the key with the min value in the dict. + + >>> d = {'a': 1, 'b': 2, 'c': 3} + >>> key_with_min_value(d) + 'a' + """ return item_with_min_value(d)[0] def max_value(d: Dict[Any, Any]) -> Any: + """Returns the maximum value in the dict. + + >>> d = {'a': 1, 'b': 2, 'c': 3} + >>> max_value(d) + 3 + """ return item_with_max_value(d)[1] def min_value(d: Dict[Any, Any]) -> Any: + """Returns the minimum value in the dict. + + >>> d = {'a': 1, 'b': 2, 'c': 3} + >>> min_value(d) + 1 + """ return item_with_min_value(d)[1] def max_key(d: Dict[Any, Any]) -> Any: + """Returns the maximum key in dict (ignoring values totally) + + >>> d = {'a': 3, 'b': 2, 'c': 1} + >>> max_key(d) + 'c' + """ return max(d.keys()) def min_key(d: Dict[Any, Any]) -> Any: + """Returns the minimum key in dict (ignoring values totally) + + >>> d = {'a': 3, 'b': 2, 'c': 1} + >>> min_key(d) + 'a' + """ return min(d.keys()) + + +if __name__ == '__main__': + import doctest + doctest.testmod() diff --git a/executors.py b/executors.py index 63efd81..e074c30 100644 --- a/executors.py +++ b/executors.py @@ -619,7 +619,7 @@ class RemoteExecutor(BaseExecutor): while True: try: - p.wait(timeout=0.5) + p.wait(timeout=0.25) except subprocess.TimeoutExpired: self.heartbeat() @@ -882,7 +882,7 @@ class DefaultExecutors(object): RemoteWorkerRecord( username = 'scott', machine = 'meerkat.cabin', - weight = 7, + weight = 6, count = 2, ), ) diff --git a/file_utils.py b/file_utils.py index 525a1af..7270e30 100644 --- a/file_utils.py +++ b/file_utils.py @@ -50,26 +50,48 @@ def create_path_if_not_exist(path, on_error=None): def does_file_exist(filename: str) -> bool: + """Returns True if a file exists and is a normal file. + + >>> does_file_exist(__file__) + True + """ return os.path.exists(filename) and os.path.isfile(filename) def does_directory_exist(dirname: str) -> bool: + """Returns True if a file exists and is a directory. + + >>> does_directory_exist('/tmp') + True + """ return os.path.exists(dirname) and os.path.isdir(dirname) def does_path_exist(pathname: str) -> bool: + """Just a more verbose wrapper around os.path.exists.""" return os.path.exists(pathname) def get_file_size(filename: str) -> int: + """Returns the size of a file in bytes.""" return os.path.getsize(filename) def is_normal_file(filename: str) -> bool: + """Returns True if filename is a normal file. + + >>> is_normal_file(__file__) + True + """ return os.path.isfile(filename) def is_directory(filename: str) -> bool: + """Returns True if filename is a directory. + + >>> is_directory('/tmp') + True + """ return os.path.isdir(filename) @@ -274,3 +296,8 @@ class FileWriter(object): if (ret >> 8) != 0: raise Exception(f'{cmd} failed, exit value {ret>>8}') return None + + +if __name__ == '__main__': + import doctest + doctest.testmod() diff --git a/google_assistant.py b/google_assistant.py index a50003c..b34197a 100644 --- a/google_assistant.py +++ b/google_assistant.py @@ -45,10 +45,16 @@ audio_url: {self.audio_url}""" def tell_google(cmd: str, *, recognize_speech=True) -> GoogleResponse: + """Alias for ask_google.""" return ask_google(cmd, recognize_speech=recognize_speech) def ask_google(cmd: str, *, recognize_speech=True) -> GoogleResponse: + """Send a command string to Google via the google_assistant_bridge as the + user google_assistant_username and return the response. If recognize_speech + is True, perform speech recognition on the audio response from Google so as + to translate it into text (best effort, YMMV). + """ logging.debug(f"Asking google: '{cmd}'") payload = { "command": cmd, diff --git a/id_generator.py b/id_generator.py index c5a0d93..4e650dc 100644 --- a/id_generator.py +++ b/id_generator.py @@ -12,11 +12,22 @@ generators = {} def get(name: str) -> int: """ - def __init__(self): - self.my_unique_id = id_generator.get("student_id") + Returns a thread safe monotonically increasing id suitable for use + as a globally unique identifier. + + >>> import id_generator + >>> id_generator.get('student_id') + 0 + >>> id_generator.get('student_id') + 1 """ if name not in generators: generators[name] = itertools.count() x = next(generators[name]) logger.debug(f"Generated next id {x}") return x + + +if __name__ == '__main__': + import doctest + doctest.testmod() diff --git a/list_utils.py b/list_utils.py index 7d3355c..993ca8a 100644 --- a/list_utils.py +++ b/list_utils.py @@ -5,16 +5,28 @@ from typing import Any, Iterator, List def shard(lst: List[Any], size: int) -> Iterator[Any]: - """Yield successive size-sized shards from lst.""" + """ + Yield successive size-sized shards from lst. + + >>> for sublist in shard([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], 3): + ... [_ for _ in sublist] + [1, 2, 3] + [4, 5, 6] + [7, 8, 9] + [10, 11, 12] + + """ for x in range(0, len(lst), size): yield islice(lst, x, x + size) def flatten(lst: List[Any]) -> List[Any]: - """Flatten out a list: + """ + Flatten out a list: + + >>> flatten([ 1, [2, 3, 4, [5], 6], 7, [8, [9]]]) + [1, 2, 3, 4, 5, 6, 7, 8, 9] - >>> flatten([ 1, [2, 3, 4, [5], 6], 7, [8, [9]]]) - [1, 2, 3, 4, 5, 6, 7, 8, 9] """ if len(lst) == 0: return lst @@ -24,6 +36,17 @@ def flatten(lst: List[Any]) -> List[Any]: def prepend(item: Any, lst: List[Any]) -> List[Any]: - """Prepend an item to a list.""" - lst = list.insert(0, item) + """ + Prepend an item to a list. + + >>> prepend('foo', ['bar', 'baz']) + ['foo', 'bar', 'baz'] + + """ + lst.insert(0, item) return lst + + +if __name__ == '__main__': + import doctest + doctest.testmod() diff --git a/lockfile.py b/lockfile.py index 34279ba..1e0516b 100644 --- a/lockfile.py +++ b/lockfile.py @@ -28,7 +28,14 @@ class LockFileContents: class LockFile(object): """A file locking mechanism that has context-manager support so you - can use it in a with statement. + can use it in a with statement. e.g. + + with LockFile('./foo.lock'): + # do a bunch of stuff... if the process dies we have a signal + # handler to do cleanup. Other code (in this process or another) + # that tries to take the same lockfile will block. There is also + # some logic for detecting stale locks. + """ def __init__( diff --git a/logical_search.py b/logical_search.py index 86c6352..805ec22 100644 --- a/logical_search.py +++ b/logical_search.py @@ -63,7 +63,32 @@ class Operation(enum.Enum): class Corpus(object): - """A collection of searchable documents.""" + """A collection of searchable documents. + + >>> c = Corpus() + >>> c.add_doc(Document( + ... docid=1, + ... tags=set(['urgent', 'important']), + ... properties=[ + ... ('author', 'Scott'), + ... ('subject', 'your anniversary') + ... ], + ... reference=None, + ... ) + ... ) + >>> c.add_doc(Document( + ... docid=2, + ... tags=set(['important']), + ... properties=[ + ... ('author', 'Joe'), + ... ('subject', 'your performance at work') + ... ], + ... reference=None, + ... ) + ... ) + >>> c.query('author:Scott and important') + {1} + """ def __init__(self) -> None: self.docids_by_tag: Dict[str, Set[str]] = defaultdict(set) @@ -133,15 +158,15 @@ class Corpus(object): def get_docids_with_property(self, key: str) -> Set[str]: """Return the set of docids that have a particular property no matter what that property's value. - """ + """ return self.docids_with_property[key] def get_docids_by_property(self, key: str, value: str) -> Set[str]: """Return the set of docids that have a particular property with a particular value.. - """ + """ return self.docids_by_property[(key, value)] def invert_docid_set(self, original: Set[str]) -> Set[str]: @@ -205,7 +230,6 @@ class Corpus(object): return operator_precedence(token) is not None def lex(query: str): - query = query.lower() tokens = query.split() for token in tokens: # Handle ( and ) operators stuck to the ends of tokens @@ -365,3 +389,8 @@ class Node(object): else: raise ParseError(f"Unexpected negation operand {_} ({type(_)})") return retval + + +if __name__ == '__main__': + import doctest + doctest.testmod() diff --git a/math_utils.py b/math_utils.py index 6277123..fa0bc0e 100644 --- a/math_utils.py +++ b/math_utils.py @@ -7,6 +7,20 @@ from heapq import heappush, heappop class RunningMedian: + """A running median computer. + + >>> median = RunningMedian() + >>> median.add_number(1) + >>> median.add_number(10) + >>> median.add_number(3) + >>> median.get_median() + 3 + >>> median.add_number(7) + >>> median.add_number(5) + >>> median.get_median() + 5 + """ + def __init__(self): self.lowers, self.highers = [], [] @@ -55,19 +69,46 @@ def gcd_float_sequence(lst: List[float]) -> float: def truncate_float(n: float, decimals: int = 2): - """Truncate a float to a particular number of decimals.""" + """ + Truncate a float to a particular number of decimals. + + >>> truncate_float(3.1415927, 3) + 3.141 + + """ assert decimals > 0 and decimals < 10 multiplier = 10 ** decimals return int(n * multiplier) / multiplier def percentage_to_multiplier(percent: float) -> float: + """Given a percentage (e.g. 155%), return a factor needed to scale a + number by that percentage. + + >>> percentage_to_multiplier(155) + 2.55 + >>> percentage_to_multiplier(45) + 1.45 + >>> percentage_to_multiplier(-25) + 0.75 + + """ multiplier = percent / 100 multiplier += 1.0 return multiplier def multiplier_to_percent(multiplier: float) -> float: + """Convert a multiplicative factor into a percent change. + + >>> multiplier_to_percent(0.75) + -25.0 + >>> multiplier_to_percent(1.0) + 0.0 + >>> multiplier_to_percent(1.99) + 99.0 + + """ percent = multiplier if percent > 0.0: percent -= 1.0 @@ -79,7 +120,18 @@ def multiplier_to_percent(multiplier: float) -> float: @functools.lru_cache(maxsize=1024, typed=True) def is_prime(n: int) -> bool: - """Returns True if n is prime and False otherwise""" + """ + Returns True if n is prime and False otherwise. Obviously(?) very slow for + very large input numbers. + + >>> is_prime(13) + True + >>> is_prime(22) + False + >>> is_prime(51602981) + True + + """ if not isinstance(n, int): raise TypeError("argument passed to is_prime is not of 'int' type") @@ -100,3 +152,8 @@ def is_prime(n: int) -> bool: return False i = i + 6 return True + + +if __name__ == '__main__': + import doctest + doctest.testmod() diff --git a/misc_utils.py b/misc_utils.py index a4757bd..fc1d5c2 100644 --- a/misc_utils.py +++ b/misc_utils.py @@ -2,5 +2,7 @@ import os + def is_running_as_root() -> bool: + """Returns True if running as root.""" return os.geteuid() == 0 diff --git a/stopwatch.py b/stopwatch.py index 1326cb1..cdd405b 100644 --- a/stopwatch.py +++ b/stopwatch.py @@ -6,11 +6,15 @@ from typing import Callable class Timer(object): """ - with timer.Timer() as t: - do_the_thing() + A stopwatch to time how long something takes (walltime). - walltime = t() - print(f'That took {walltime}s.') + e.g. + + with timer.Timer() as t: + do_the_thing() + + walltime = t() + print(f'That took {walltime} seconds.') """ def __init__(self) -> None: @@ -18,6 +22,9 @@ class Timer(object): self.end = None def __enter__(self) -> Callable[[], float]: + """Returns a functor that, when called, returns the walltime of the + operation in seconds. + """ self.start = time.perf_counter() self.end = 0.0 return lambda: self.end - self.start diff --git a/string_utils.py b/string_utils.py index bca2b70..5eb03d2 100644 --- a/string_utils.py +++ b/string_utils.py @@ -6,6 +6,7 @@ import io from itertools import zip_longest import json import logging +import numbers import random import re import string @@ -89,10 +90,18 @@ UUID_HEX_OK_RE = re.compile( SHALLOW_IP_V4_RE = re.compile(r"^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$") +ANYWHERE_IP_V4_RE = re.compile(r"\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}") + IP_V6_RE = re.compile(r"^([a-z\d]{0,4}:){7}[a-z\d]{0,4}$", re.IGNORECASE) +ANYWHERE_IP_V6_RE = re.compile(r"([a-z\d]{0,4}:){7}[a-z\d]{0,4}", re.IGNORECASE) + MAC_ADDRESS_RE = re.compile( - r"^([0-9A-F]{2}[:-]){5}([0-9A-F]{2})", re.IGNORECASE + r"^([0-9A-F]{2}[:-]){5}([0-9A-F]{2})$", re.IGNORECASE +) + +ANYWHERE_MAC_ADDRESS_RE = re.compile( + r"([0-9A-F]{2}[:-]){5}([0-9A-F]{2})", re.IGNORECASE ) WORDS_COUNT_RE = re.compile( @@ -134,27 +143,91 @@ NUM_SUFFIXES = { def is_none_or_empty(in_str: Optional[str]) -> bool: + """ + Returns true if the input string is either None or an empty string. + + >>> is_none_or_empty("") + True + >>> is_none_or_empty(None) + True + >>> is_none_or_empty(" ") + True + >>> is_none_or_empty('Test') + False + """ return in_str is None or len(in_str.strip()) == 0 def is_string(obj: Any) -> bool: """ Checks if an object is a string. + + >>> is_string('test') + True + >>> is_string(123) + False + >>> is_string(100.3) + False + >>> is_string([1, 2, 3]) + False """ return isinstance(obj, str) def is_empty_string(in_str: Any) -> bool: + """ + Checks if input is a string and empty or only whitespace. + + >>> is_empty_string('') + True + >>> is_empty_string(' \t\t ') + True + >>> is_empty_string('test') + False + >>> is_empty_string(100.88) + False + >>> is_empty_string([1, 2, 3]) + False + """ return is_string(in_str) and in_str.strip() == "" def is_full_string(in_str: Any) -> bool: + """ + Checks that input is a string and is not empty ('') or only whitespace. + + >>> is_full_string('test!') + True + >>> is_full_string('') + False + >>> is_full_string(' ') + False + >>> is_full_string(100.999) + False + >>> is_full_string({"a": 1, "b": 2}) + False + """ return is_string(in_str) and in_str.strip() != "" def is_number(in_str: str) -> bool: """ Checks if a string is a valid number. + + >>> is_number(100.5) + Traceback (most recent call last): + ... + ValueError: 100.5 + >>> is_number("100.5") + True + >>> is_number("test") + False + >>> is_number("99") + True + >>> is_number([1, 2, 3]) + Traceback (most recent call last): + ... + ValueError: [1, 2, 3] """ if not is_string(in_str): raise ValueError(in_str) @@ -167,10 +240,10 @@ def is_integer_number(in_str: str) -> bool: An integer may be signed or unsigned or use a "scientific notation". - *Examples:* - - >>> is_integer('42') # returns true - >>> is_integer('42.0') # returns false + >>> is_integer_number('42') + True + >>> is_integer_number('42.0') + False """ return ( (is_number(in_str) and "." not in in_str) or @@ -181,24 +254,89 @@ def is_integer_number(in_str: str) -> bool: def is_hexidecimal_integer_number(in_str: str) -> bool: + """ + Checks whether a string is a hex integer number. + + >>> is_hexidecimal_integer_number('0x12345') + True + >>> is_hexidecimal_integer_number('0x1A3E') + True + >>> is_hexidecimal_integer_number('1234') # Needs 0x + False + >>> is_hexidecimal_integer_number('-0xff') + True + >>> is_hexidecimal_integer_number('test') + False + >>> is_hexidecimal_integer_number(12345) # Not a string + Traceback (most recent call last): + ... + ValueError: 12345 + >>> is_hexidecimal_integer_number(101.4) + Traceback (most recent call last): + ... + ValueError: 101.4 + >>> is_hexidecimal_integer_number(0x1A3E) + Traceback (most recent call last): + ... + ValueError: 6718 + """ if not is_string(in_str): raise ValueError(in_str) return HEX_NUMBER_RE.match(in_str) is not None def is_octal_integer_number(in_str: str) -> bool: + """ + Checks whether a string is an octal number. + + >>> is_octal_integer_number('0o777') + True + >>> is_octal_integer_number('-0O115') + True + >>> is_octal_integer_number('0xFF') # Not octal, needs 0o + False + >>> is_octal_integer_number('7777') # Needs 0o + False + >>> is_octal_integer_number('test') + False + """ if not is_string(in_str): raise ValueError(in_str) return OCT_NUMBER_RE.match(in_str) is not None def is_binary_integer_number(in_str: str) -> bool: + """ + Returns whether a string contains a binary number. + + >>> is_binary_integer_number('0b10111') + True + >>> is_binary_integer_number('-0b111') + True + >>> is_binary_integer_number('0B10101') + True + >>> is_binary_integer_number('0b10102') + False + >>> is_binary_integer_number('0xFFF') + False + >>> is_binary_integer_number('test') + False + """ if not is_string(in_str): raise ValueError(in_str) return BIN_NUMBER_RE.match(in_str) is not None def to_int(in_str: str) -> int: + """Returns the integral value of the string or raises on error. + + >>> to_int('1234') + 1234 + >>> to_int('test') + Traceback (most recent call last): + ... + ValueError: invalid literal for int() with base 10: 'test' + """ if not is_string(in_str): raise ValueError(in_str) if is_binary_integer_number(in_str): @@ -216,13 +354,21 @@ def is_decimal_number(in_str: str) -> bool: A decimal may be signed or unsigned or use a "scientific notation". - >>> is_decimal('42.0') # returns true - >>> is_decimal('42') # returns false + >>> is_decimal_number('42.0') + True + >>> is_decimal_number('42') + False """ return is_number(in_str) and "." in in_str def strip_escape_sequences(in_str: str) -> str: + """ + Remove escape sequences in the input string. + + >>> strip_escape_sequences('this is a test!') + 'this is a test!' + """ in_str = ESCAPE_SEQUENCE_RE.sub("", in_str) return in_str @@ -233,7 +379,22 @@ def add_thousands_separator( separator_char = ',', places = 3 ) -> str: - if isinstance(in_str, int): + """ + Add thousands separator to a numeric string. Also handles numbers. + + >>> add_thousands_separator('12345678') + '12,345,678' + >>> add_thousands_separator(12345678) + '12,345,678' + >>> add_thousands_separator(12345678.99) + '12,345,678.99' + >>> add_thousands_separator('test') + Traceback (most recent call last): + ... + ValueError: test + + """ + if isinstance(in_str, numbers.Number): in_str = f'{in_str}' if is_number(in_str): return _add_thousands_separator( @@ -263,11 +424,12 @@ def is_url(in_str: Any, allowed_schemes: Optional[List[str]] = None) -> bool: """ Check if a string is a valid url. - *Examples:* - - >>> is_url('http://www.mysite.com') # returns true - >>> is_url('https://mysite.com') # returns true - >>> is_url('.mysite.com') # returns false + >>> is_url('http://www.mysite.com') + True + >>> is_url('https://mysite.com') + True + >>> is_url('.mysite.com') + False """ if not is_full_string(in_str): return False @@ -285,10 +447,10 @@ def is_email(in_str: Any) -> bool: Reference: https://tools.ietf.org/html/rfc3696#section-3 - *Examples:* - - >>> is_email('my.email@the-provider.com') # returns true - >>> is_email('@gmail.com') # returns false + >>> is_email('my.email@the-provider.com') + True + >>> is_email('@gmail.com') + False """ if ( not is_full_string(in_str) @@ -331,8 +493,12 @@ def is_email(in_str: Any) -> bool: def suffix_string_to_number(in_str: str) -> Optional[int]: """Take a string like "33Gb" and convert it into a number (of bytes) like 34603008. Return None if the input string is not valid. - """ + >>> suffix_string_to_number('1Mb') + 1048576 + >>> suffix_string_to_number('13.1Gb') + 14066017894 + """ def suffix_capitalize(s: str) -> str: if len(s) == 1: return s.upper() @@ -352,13 +518,21 @@ def suffix_string_to_number(in_str: str) -> Optional[int]: if multiplier is not None: r = rest[x] if is_integer_number(r): - return int(r) * multiplier + return to_int(r) * multiplier + if is_decimal_number(r): + return int(float(r) * multiplier) return None def number_to_suffix_string(num: int) -> Optional[str]: """Take a number (of bytes) and returns a string like "43.8Gb". Returns none if the input is invalid. + + >>> number_to_suffix_string(14066017894) + '13.1Gb' + >>> number_to_suffix_string(1024 * 1024) + '1.0Mb' + """ d = 0.0 suffix = None @@ -427,6 +601,16 @@ def is_snake_case(in_str: Any, *, separator: str = "_") -> bool: - it's composed only by lowercase/uppercase letters and digits - it contains at least one underscore (or provided separator) - it does not start with a number + + >>> is_snake_case('this_is_a_test') + True + >>> is_snake_case('___This_Is_A_Test_1_2_3___') + True + >>> is_snake_case('this-is-a-test') + False + >>> is_snake_case('this-is-a-test', separator='-') + True + """ if is_full_string(in_str): re_map = {"_": SNAKE_CASE_TEST_RE, "-": SNAKE_CASE_TEST_DASH_RE} @@ -447,11 +631,12 @@ def is_json(in_str: Any) -> bool: """ Check if a string is a valid json. - *Examples:* - - >>> is_json('{"name": "Peter"}') # returns true - >>> is_json('[1, 2, 3]') # returns true - >>> is_json('{nope}') # returns false + >>> is_json('{"name": "Peter"}') + True + >>> is_json('[1, 2, 3]') + True + >>> is_json('{nope}') + False """ if is_full_string(in_str) and JSON_WRAPPER_RE.match(in_str) is not None: try: @@ -465,11 +650,12 @@ def is_uuid(in_str: Any, allow_hex: bool = False) -> bool: """ Check if a string is a valid UUID. - *Example:* - - >>> is_uuid('6f8aa2f9-686c-4ac3-8766-5712354a04cf') # returns true - >>> is_uuid('6f8aa2f9686c4ac387665712354a04cf') # returns false - >>> is_uuid('6f8aa2f9686c4ac387665712354a04cf', allow_hex=True) # returns true + >>> is_uuid('6f8aa2f9-686c-4ac3-8766-5712354a04cf') + True + >>> is_uuid('6f8aa2f9686c4ac387665712354a04cf') + False + >>> is_uuid('6f8aa2f9686c4ac387665712354a04cf', allow_hex=True) + True """ # string casting is used to allow UUID itself as input data type s = str(in_str) @@ -482,11 +668,12 @@ def is_ip_v4(in_str: Any) -> bool: """ Checks if a string is a valid ip v4. - *Examples:* - - >>> is_ip_v4('255.200.100.75') # returns true - >>> is_ip_v4('nope') # returns false (not an ip) - >>> is_ip_v4('255.200.100.999') # returns false (999 is out of range) + >>> is_ip_v4('255.200.100.75') + True + >>> is_ip_v4('nope') + False + >>> is_ip_v4('255.200.100.999') # 999 out of range + False """ if not is_full_string(in_str) or SHALLOW_IP_V4_RE.match(in_str) is None: return False @@ -501,11 +688,14 @@ def is_ip_v4(in_str: Any) -> bool: def extract_ip_v4(in_str: Any) -> Optional[str]: """ Extracts the IPv4 chunk of a string or None. + + >>> extract_ip_v4(' The secret IP address: 127.0.0.1 (use it wisely) ') + '127.0.0.1' + >>> extract_ip_v4('Your mom dresses you funny.') """ if not is_full_string(in_str): return None - in_str.strip() - m = SHALLOW_IP_V4_RE.match(in_str) + m = ANYWHERE_IP_V4_RE.search(in_str) if m is not None: return m.group(0) return None @@ -515,10 +705,10 @@ def is_ip_v6(in_str: Any) -> bool: """ Checks if a string is a valid ip v6. - *Examples:* - - >>> is_ip_v6('2001:db8:85a3:0000:0000:8a2e:370:7334') # returns true - >>> is_ip_v6('2001:db8:85a3:0000:0000:8a2e:370:?') # returns false (invalid "?") + >>> is_ip_v6('2001:db8:85a3:0000:0000:8a2e:370:7334') + True + >>> is_ip_v6('2001:db8:85a3:0000:0000:8a2e:370:?') # invalid "?" + False """ return is_full_string(in_str) and IP_V6_RE.match(in_str) is not None @@ -526,11 +716,14 @@ def is_ip_v6(in_str: Any) -> bool: def extract_ip_v6(in_str: Any) -> Optional[str]: """ Extract IPv6 chunk or None. + + >>> extract_ip_v6('IP: 2001:db8:85a3:0000:0000:8a2e:370:7334') + '2001:db8:85a3:0000:0000:8a2e:370:7334' + >>> extract_ip_v6("(and she's ugly too, btw)") """ if not is_full_string(in_str): return None - in_str.strip() - m = IP_V6_RE.match(in_str) + m = ANYWHERE_IP_V6_RE.search(in_str) if m is not None: return m.group(0) return None @@ -542,15 +735,29 @@ def is_ip(in_str: Any) -> bool: *Examples:* - >>> is_ip('255.200.100.75') # returns true - >>> is_ip('2001:db8:85a3:0000:0000:8a2e:370:7334') # returns true - >>> is_ip('1.2.3') # returns false + >>> is_ip('255.200.100.75') + True + >>> is_ip('2001:db8:85a3:0000:0000:8a2e:370:7334') + True + >>> is_ip('1.2.3') + False + >>> is_ip('1.2.3.999') + False """ return is_ip_v6(in_str) or is_ip_v4(in_str) def extract_ip(in_str: Any) -> Optional[str]: - """Extract the IP address or None.""" + """ + Extract the IP address or None. + + >>> extract_ip('Attacker: 255.200.100.75') + '255.200.100.75' + >>> extract_ip('Remote host: 2001:db8:85a3:0000:0000:8a2e:370:7334') + '2001:db8:85a3:0000:0000:8a2e:370:7334' + >>> extract_ip('1.2.3') + + """ ip = extract_ip_v4(in_str) if ip is None: ip = extract_ip_v6(in_str) @@ -558,16 +765,32 @@ def extract_ip(in_str: Any) -> Optional[str]: def is_mac_address(in_str: Any) -> bool: - """Return True if in_str is a valid MAC address false otherwise.""" + """Return True if in_str is a valid MAC address false otherwise. + + >>> is_mac_address("34:29:8F:12:0D:2F") + True + >>> is_mac_address('34:29:8f:12:0d:2f') + True + >>> is_mac_address('34-29-8F-12-0D-2F') + True + >>> is_mac_address("test") + False + """ return is_full_string(in_str) and MAC_ADDRESS_RE.match(in_str) is not None def extract_mac_address(in_str: Any, *, separator: str = ":") -> Optional[str]: - """Extract the MAC address from in_str""" + """ + Extract the MAC address from in_str. + + >>> extract_mac_address(' MAC Address: 34:29:8F:12:0D:2F') + '34:29:8F:12:0D:2F' + + """ if not is_full_string(in_str): return None in_str.strip() - m = MAC_ADDRESS_RE.match(in_str) + m = ANYWHERE_MAC_ADDRESS_RE.search(in_str) if m is not None: mac = m.group(0) mac.replace(":", separator) @@ -580,16 +803,11 @@ def is_slug(in_str: Any, separator: str = "-") -> bool: """ Checks if a given string is a slug (as created by `slugify()`). - *Examples:* - - >>> is_slug('my-blog-post-title') # returns true - >>> is_slug('My blog post title') # returns false + >>> is_slug('my-blog-post-title') + True + >>> is_slug('My blog post title') + False - :param in_str: String to check. - :type in_str: str - :param separator: Join sign used by the slug. - :type separator: str - :return: True if slug, false otherwise. """ if not is_full_string(in_str): return False @@ -604,10 +822,11 @@ def contains_html(in_str: str) -> bool: By design, this function matches ANY type of tag, so don't expect to use it as an HTML validator, its goal is to detect "malicious" or undesired tags in the text. - *Examples:* + >>> contains_html('my string is bold') + True + >>> contains_html('my string is not bold') + False - >>> contains_html('my string is bold') # returns true - >>> contains_html('my string is not bold') # returns false """ if not is_string(in_str): raise ValueError(in_str) @@ -623,10 +842,11 @@ def words_count(in_str: str) -> int: Moreover it is aware of punctuation, so the count for a string like "one,two,three.stop" will be 4 not 1 (even if there are no spaces in the string). - *Examples:* + >>> words_count('hello world') + 2 + >>> words_count('one,two,three.stop') + 4 - >>> words_count('hello world') # returns 2 - >>> words_count('one,two,three.stop') # returns 4 """ if not is_string(in_str): raise ValueError(in_str) @@ -637,10 +857,9 @@ def generate_uuid(as_hex: bool = False) -> str: """ Generated an UUID string (using `uuid.uuid4()`). - *Examples:* + generate_uuid() # possible output: '97e3a716-6b33-4ab9-9bb1-8128cb24d76b' + generate_uuid(as_hex=True) # possible output: '97e3a7166b334ab99bb18128cb24d76b' - >>> uuid() # possible output: '97e3a716-6b33-4ab9-9bb1-8128cb24d76b' - >>> uuid(as_hex=True) # possible output: '97e3a7166b334ab99bb18128cb24d76b' """ uid = uuid4() if as_hex: @@ -653,9 +872,8 @@ def generate_random_alphanumeric_string(size: int) -> str: Returns a string of the specified size containing random characters (uppercase/lowercase ascii letters and digits). - *Example:* + random_string(9) # possible output: "cx3QQbzYg" - >>> random_string(9) # possible output: "cx3QQbzYg" """ if size < 1: raise ValueError("size must be >= 1") @@ -667,6 +885,10 @@ def generate_random_alphanumeric_string(size: int) -> str: def reverse(in_str: str) -> str: """ Returns the string with its chars reversed. + + >>> reverse('test') + 'tset' + """ if not is_string(in_str): raise ValueError(in_str) @@ -677,6 +899,11 @@ def camel_case_to_snake_case(in_str, *, separator="_"): """ Convert a camel case string into a snake case one. (The original string is returned if is not a valid camel case string) + + >>> camel_case_to_snake_case('MacAddressExtractorFactory') + 'mac_address_extractor_factory' + >>> camel_case_to_snake_case('Luke Skywalker') + 'Luke Skywalker' """ if not is_string(in_str): raise ValueError(in_str) @@ -693,6 +920,11 @@ def snake_case_to_camel_case( """ Convert a snake case string into a camel case one. (The original string is returned if is not a valid snake case string) + + >>> snake_case_to_camel_case('this_is_a_test') + 'ThisIsATest' + >>> snake_case_to_camel_case('Han Solo') + 'Han Solo' """ if not is_string(in_str): raise ValueError(in_str) @@ -705,12 +937,22 @@ def snake_case_to_camel_case( def to_char_list(in_str: str) -> List[str]: + """Convert a string into a list of chars. + + >>> to_char_list('test') + ['t', 'e', 's', 't'] + """ if not is_string(in_str): return [] return list(in_str) def from_char_list(in_list: List[str]) -> str: + """Convert a char list into a string. + + >>> from_char_list(['t', 'e', 's', 't']) + 'test' + """ return "".join(in_list) @@ -731,10 +973,10 @@ def strip_html(in_str: str, keep_tag_content: bool = False) -> str: """ Remove html code contained into the given string. - *Examples:* - - >>> strip_html('test: click here') # returns 'test: ' - >>> strip_html('test: click here', keep_tag_content=True) # returns 'test: click here' + >>> strip_html('test: click here') + 'test: ' + >>> strip_html('test: click here', keep_tag_content=True) + 'test: click here' """ if not is_string(in_str): raise ValueError(in_str) @@ -744,14 +986,14 @@ def strip_html(in_str: str, keep_tag_content: bool = False) -> str: def asciify(in_str: str) -> str: """ - Force string content to be ascii-only by translating all non-ascii chars into the closest possible representation - (eg: ó -> o, Ë -> E, ç -> c...). - - **Bear in mind**: Some chars may be lost if impossible to translate. + Force string content to be ascii-only by translating all non-ascii + chars into the closest possible representation (eg: ó -> o, Ë -> + E, ç -> c...). - *Example:* + N.B. Some chars may be lost if impossible to translate. - >>> asciify('èéùúòóäåëýñÅÀÁÇÌÍÑÓË') # returns 'eeuuooaaeynAAACIINOE' + >>> asciify('èéùúòóäåëýñÅÀÁÇÌÍÑÓË') + 'eeuuooaaeynAAACIINOE' """ if not is_string(in_str): raise ValueError(in_str) @@ -780,10 +1022,10 @@ def slugify(in_str: str, *, separator: str = "-") -> str: - all chars are encoded as ascii (by using `asciify()`) - is safe for URL - *Examples:* - - >>> slugify('Top 10 Reasons To Love Dogs!!!') # returns: 'top-10-reasons-to-love-dogs' - >>> slugify('Mönstér Mägnët') # returns 'monster-magnet' + >>> slugify('Top 10 Reasons To Love Dogs!!!') + 'top-10-reasons-to-love-dogs' + >>> slugify('Mönstér Mägnët') + 'monster-magnet' """ if not is_string(in_str): raise ValueError(in_str) @@ -803,7 +1045,8 @@ def to_bool(in_str: str) -> bool: """ Turns a string into a boolean based on its content (CASE INSENSITIVE). - A positive boolean (True) is returned if the string value is one of the following: + A positive boolean (True) is returned if the string value is one + of the following: - "true" - "1" @@ -811,6 +1054,17 @@ def to_bool(in_str: str) -> bool: - "y" Otherwise False is returned. + + >>> to_bool('True') + True + >>> to_bool('1') + True + >>> to_bool('yes') + True + >>> to_bool('no') + False + >>> to_bool('huh?') + False """ if not is_string(in_str): raise ValueError(in_str) @@ -818,6 +1072,9 @@ def to_bool(in_str: str) -> bool: def to_date(in_str: str) -> Optional[datetime.date]: + """ + Parses a date string. See DateParser docs for details. + """ import dateparse.dateparse_utils as dp try: d = dp.DateParser() @@ -829,6 +1086,9 @@ def to_date(in_str: str) -> Optional[datetime.date]: def valid_date(in_str: str) -> bool: + """ + True if the string represents a valid date. + """ import dateparse.dateparse_utils as dp try: d = dp.DateParser() @@ -840,6 +1100,9 @@ def valid_date(in_str: str) -> bool: def to_datetime(in_str: str) -> Optional[datetime.datetime]: + """ + Parses a datetime string. See DateParser docs for more info. + """ import dateparse.dateparse_utils as dp try: d = dp.DateParser() @@ -852,6 +1115,9 @@ def to_datetime(in_str: str) -> Optional[datetime.datetime]: def valid_datetime(in_str: str) -> bool: + """ + True if the string represents a valid datetime. + """ _ = to_datetime(in_str) if _ is not None: return True @@ -862,20 +1128,6 @@ def valid_datetime(in_str: str) -> bool: def dedent(in_str: str) -> str: """ Removes tab indentation from multi line strings (inspired by analogous Scala function). - - *Example:* - - >>> strip_margin(''' - >>> line 1 - >>> line 2 - >>> line 3 - >>> ''') - >>> # returns: - >>> ''' - >>> line 1 - >>> line 2 - >>> line 3 - >>> ''' """ if not is_string(in_str): raise ValueError(in_str) @@ -885,6 +1137,13 @@ def dedent(in_str: str) -> str: def indent(in_str: str, amount: int) -> str: + """ + Indents string by prepending amount spaces. + + >>> indent('This is a test', 4) + ' This is a test' + + """ if not is_string(in_str): raise ValueError(in_str) line_separator = '\n' @@ -893,6 +1152,7 @@ def indent(in_str: str, amount: int) -> str: def sprintf(*args, **kwargs) -> str: + """String printf, like in C""" ret = "" sep = kwargs.pop("sep", None) @@ -924,6 +1184,15 @@ def sprintf(*args, **kwargs) -> str: class SprintfStdout(object): + """ + A context manager that captures outputs to stdout. + + with SprintfStdout() as buf: + print("test") + print(buf()) + + 'test\n' + """ def __init__(self) -> None: self.destination = io.StringIO() self.recorder = None @@ -940,18 +1209,48 @@ class SprintfStdout(object): def is_are(n: int) -> str: + """Is or are? + + >>> is_are(1) + 'is' + >>> is_are(2) + 'are' + + """ if n == 1: return "is" return "are" def pluralize(n: int) -> str: + """Add an s? + + >>> pluralize(15) + 's' + >>> count = 1 + >>> print(f'There {is_are(count)} {count} file{pluralize(count)}.') + There is 1 file. + >>> count = 4 + >>> print(f'There {is_are(count)} {count} file{pluralize(count)}.') + There are 4 files. + + """ if n == 1: return "" return "s" def thify(n: int) -> str: + """Return the proper cardinal suffix for a number. + + >>> thify(1) + 'st' + >>> thify(33) + 'rd' + >>> thify(16) + 'th' + + """ digit = str(n) assert is_integer_number(digit) digit = digit[-1:] @@ -966,6 +1265,12 @@ def thify(n: int) -> str: def ngrams(txt: str, n: int): + """Return the ngrams from a string. + + >>> [x for x in ngrams('This is a test', 2)] + ['This is', 'is a', 'a test'] + + """ words = txt.split() return ngrams_presplit(words, n) @@ -983,35 +1288,79 @@ def trigrams(txt: str): return ngrams(txt, 3) -def shuffle_columns( - txt: Iterable[str], - specs: Iterable[Iterable[int]], +def shuffle_columns_into_list( + input_lines: Iterable[str], + column_specs: Iterable[Iterable[int]], delim='' ) -> Iterable[str]: + """Helper to shuffle / parse columnar data and return the results as a + list. The column_specs argument is an iterable collection of + numeric sequences that indicate one or more column numbers to + copy. + + >>> cols = '-rwxr-xr-x 1 scott wheel 3.1K Jul 9 11:34 acl_test.py'.split() + >>> shuffle_columns_into_list( + ... cols, + ... [ [8], [2, 3], [5, 6, 7] ], + ... delim=' ', + ... ) + ['acl_test.py', 'scott wheel', 'Jul 9 11:34'] + + """ out = [] - for spec in specs: + + # Column specs map input lines' columns into outputs. + # [col1, col2...] + for spec in column_specs: chunk = '' for n in spec: - chunk = chunk + delim + txt[n] + chunk = chunk + delim + input_lines[n] chunk = chunk.strip(delim) out.append(chunk) return out def shuffle_columns_into_dict( - txt: Iterable[str], - specs: Iterable[Tuple[str, Iterable[int]]], + input_lines: Iterable[str], + column_specs: Iterable[Tuple[str, Iterable[int]]], delim='' ) -> Dict[str, str]: + """Helper to shuffle / parse columnar data and return the results + as a dict. + + >>> cols = '-rwxr-xr-x 1 scott wheel 3.1K Jul 9 11:34 acl_test.py'.split() + >>> shuffle_columns_into_dict( + ... cols, + ... [ ('filename', [8]), ('owner', [2, 3]), ('mtime', [5, 6, 7]) ], + ... delim=' ', + ... ) + {'filename': 'acl_test.py', 'owner': 'scott wheel', 'mtime': 'Jul 9 11:34'} + + """ out = {} - for spec in specs: + + # Column specs map input lines' columns into outputs. + # "key", [col1, col2...] + for spec in column_specs: chunk = '' for n in spec[1]: - chunk = chunk + delim + txt[n] + chunk = chunk + delim + input_lines[n] chunk = chunk.strip(delim) out[spec[0]] = chunk return out def interpolate_using_dict(txt: str, values: Dict[str, str]) -> str: + """Interpolate a string with data from a dict. + + >>> interpolate_using_dict('This is a {adjective} {noun}.', + ... {'adjective': 'good', 'noun': 'example'}) + 'This is a good example.' + + """ return sprintf(txt.format(**values), end='') + + +if __name__ == '__main__': + import doctest + doctest.testmod() diff --git a/tests/parallelize_test.py b/tests/parallelize_test.py index d87b5e7..9d98710 100755 --- a/tests/parallelize_test.py +++ b/tests/parallelize_test.py @@ -37,20 +37,30 @@ def list_primes(n): @decorator_utils.timed -def driver() -> None: +def test_thread_parallelization() -> None: results = [] - for _ in range(20): - results.append(compute_factorial_process(_)) - for future in smart_future.wait_any(results): - print(f'Process: {future}') - - results = [] - for _ in range(20): + for _ in range(50): results.append(compute_factorial_thread(_)) smart_future.wait_all(results) for future in results: print(f'Thread: {future}') + texecutor = executors.DefaultExecutors().thread_pool() + texecutor.shutdown() + + +@decorator_utils.timed +def test_process_parallelization() -> None: + results = [] + for _ in range(50): + results.append(compute_factorial_process(_)) + for future in smart_future.wait_any(results): + print(f'Process: {future}') + pexecutor = executors.DefaultExecutors().process_pool() + pexecutor.shutdown() + +@decorator_utils.timed +def test_remote_parallelization() -> None: results = {} for _ in range(50): n = random.randint(0, 100000) @@ -59,66 +69,17 @@ def driver() -> None: for _ in results[n]: tot += _ print(tot) + rexecutor = executors.DefaultExecutors().remote_pool() + rexecutor.shutdown() @bootstrap.initialize def main() -> None: - print(driver()) - pexecutor = executors.DefaultExecutors().process_pool() - pexecutor.shutdown() - texecutor = executors.DefaultExecutors().thread_pool() - texecutor.shutdown() - rexecutor = executors.DefaultExecutors().remote_pool() - rexecutor.shutdown() + test_thread_parallelization() + test_process_parallelization() + test_remote_parallelization() sys.exit(0) if __name__ == '__main__': main() - -# print """Usage: python sum_primes.py [ncpus] -# [ncpus] - the number of workers to run in parallel, -# if omitted it will be set to the number of processors in the system -# """ - -# # tuple of all parallel python servers to connect with -# ppservers = () -# #ppservers = ("10.0.0.1",) - -# if len(sys.argv) > 1: -# ncpus = int(sys.argv[1]) -# # Creates jobserver with ncpus workers -# job_server = pp.Server(ncpus, ppservers=ppservers) -# else: -# # Creates jobserver with automatically detected number of workers -# job_server = pp.Server(ppservers=ppservers) - -# print "Starting pp with", job_server.get_ncpus(), "workers" - -# # Submit a job of calulating sum_primes(100) for execution. -# # sum_primes - the function -# # (100,) - tuple with arguments for sum_primes -# # (isprime,) - tuple with functions on which function sum_primes depends -# # ("math",) - tuple with module names which must be imported before sum_primes execution -# # Execution starts as soon as one of the workers will become available -# job1 = job_server.submit(sum_primes, (100,), (isprime,), ("math",)) - -# # Retrieves the result calculated by job1 -# # The value of job1() is the same as sum_primes(100) -# # If the job has not been finished yet, execution will wait here until result is available -# result = job1() - -# print "Sum of primes below 100 is", result - -# start_time = time.time() - -# # The following submits 8 jobs and then retrieves the results -# inputs = (100000, 100100, 100200, 100300, 100400, 100500, 100600, 100700) -# jobs = [(input, job_server.submit(sum_primes,(input,), (isprime,), ("math",))) for input in inputs] -# for input, job in jobs: -# print "Sum of primes below", input, "is", job() - -# print "Time elapsed: ", time.time() - start_time, "s" -# job_server.print_stats() - -# # Parallel Python Software: http://www.parallelpython.com diff --git a/tests/run_all_tests.sh b/tests/run_all_tests.sh index c2f9f93..25365bb 100755 --- a/tests/run_all_tests.sh +++ b/tests/run_all_tests.sh @@ -1,5 +1,10 @@ #!/bin/bash +for doctest in $(grep -l doctest ../*.py); do + echo "------------------------- ${doctest} -------------------------" + python3 ${doctest} +done + for test in $(ls *_test.py); do if [ "${test}" != "parallelize_test.py" ]; then echo "------------------------- ${test} -------------------------" diff --git a/text_utils.py b/text_utils.py index 3be32ff..8ea6e19 100644 --- a/text_utils.py +++ b/text_utils.py @@ -16,6 +16,8 @@ class RowsColumns(NamedTuple): def get_console_rows_columns() -> RowsColumns: + """Returns the number of rows/columns on the current console.""" + from exec_utils import cmd rows, columns = cmd("stty size").split() return RowsColumns(int(rows), int(columns)) @@ -31,6 +33,8 @@ def progress_graph( right_end="]", redraw=True, ) -> None: + """Draws a progress graph.""" + percent = current / total ret = "\r" if redraw else "\n" bar = bar_graph( @@ -53,9 +57,17 @@ def bar_graph( include_text=True, width=70, fgcolor=fg("school bus yellow"), + reset=reset(), left_end="[", right_end="]", ) -> None: + """Returns a string containing a bar graph. + + >>> bar_graph(0.5, fgcolor='', reset='') + '[███████████████████████████████████ ] 50.0%' + + """ + if percentage < 0.0 or percentage > 1.0: raise ValueError(percentage) if include_text: @@ -75,7 +87,7 @@ def bar_graph( fgcolor + "█" * whole_width + part_char + " " * (width - whole_width - 1) + - reset() + + reset + right_end + " " + text) @@ -87,6 +99,17 @@ def distribute_strings( alignment: str = "c", padding: str = " ", ) -> str: + """ + Distributes strings into a line with a particular justification. + + >>> distribute_strings(['this', 'is', 'a', 'test'], width=40) + ' this is a test ' + >>> distribute_strings(['this', 'is', 'a', 'test'], width=40, alignment='l') + 'this is a test ' + >>> distribute_strings(['this', 'is', 'a', 'test'], width=40, alignment='r') + ' this is a test' + + """ subwidth = math.floor(width / len(strings)) retval = "" for string in strings: @@ -100,6 +123,15 @@ def distribute_strings( def justify_string_by_chunk( string: str, width: int = 80, padding: str = " " ) -> str: + """ + Justifies a string. + + >>> justify_string_by_chunk("This is a test", 40) + 'This is a test' + >>> justify_string_by_chunk("This is a test", 20) + 'This is a test' + + """ padding = padding[0] first, *rest, last = string.split() w = width - (len(first) + 1 + len(last) + 1) @@ -115,6 +147,18 @@ def justify_string_by_chunk( def justify_string( string: str, *, width: int = 80, alignment: str = "c", padding: str = " " ) -> str: + """Justify a string. + + >>> justify_string('This is another test', width=40, alignment='c') + ' This is another test ' + >>> justify_string('This is another test', width=40, alignment='l') + 'This is another test ' + >>> justify_string('This is another test', width=40, alignment='r') + ' This is another test' + >>> justify_string('This is another test', width=40, alignment='j') + 'This is another test' + + """ alignment = alignment[0] padding = padding[0] while len(string) < width: @@ -139,7 +183,13 @@ def justify_string( def justify_text(text: str, *, width: int = 80, alignment: str = "c") -> str: - print("-" * width) + """ + Justifies text. + + >>> justify_text('This is a test of the emergency broadcast system. This is only a test.', + ... width=40, alignment='j') #doctest: +NORMALIZE_WHITESPACE + 'This is a test of the emergency\\nbroadcast system. This is only a test.' + """ retval = "" line = "" for word in text.split(): @@ -216,3 +266,8 @@ class Indenter: import string_utils text = string_utils.sprintf(*arg, **kwargs) print(self.pad_prefix + self.padding * self.level + text, end='') + + +if __name__ == '__main__': + import doctest + doctest.testmod() diff --git a/thread_utils.py b/thread_utils.py index bb15c03..0130cdc 100644 --- a/thread_utils.py +++ b/thread_utils.py @@ -20,6 +20,9 @@ def current_thread_id() -> str: def is_current_thread_main_thread() -> bool: + """Returns True is the current (calling) thread is the process' main + thread and False otherwise. + """ return threading.current_thread() is threading.main_thread() diff --git a/type_utils.py b/type_utils.py index 7b79af0..ee52444 100644 --- a/type_utils.py +++ b/type_utils.py @@ -7,6 +7,11 @@ logger = logging.getLogger(__name__) def unwrap_optional(x: Optional[Any]) -> Any: + """Unwrap an Optional[Type] argument returning a Type value back. + If the Optional[Type] argument is None, however, raise an exception. + Use this to satisfy most type checkers that a value that could + be None isn't so as to drop the Optional. + """ if x is None: msg = 'Argument to unwrap_optional was unexpectedly None' logger.critical(msg)