Make smart futures avoid polling.
[python_utils.git] / string_utils.py
index 5eb03d275e184af8709a87da3885b5827588c501..9a38d25c49cccddceec4da06ee8bbfe8133749aa 100644 (file)
@@ -1,5 +1,6 @@
 #!/usr/bin/env python3
 
+import base64
 import contextlib
 import datetime
 import io
@@ -10,10 +11,12 @@ import numbers
 import random
 import re
 import string
-from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple
+from typing import Any, Callable, Dict, Iterable, List, Optional, Sequence, Tuple
 import unicodedata
 from uuid import uuid4
 
+import list_utils
+
 logger = logging.getLogger(__name__)
 
 NUMBER_RE = re.compile(r"^([+\-]?)((\d+)(\.\d+)?([e|E]\d+)?|\.\d+)$")
@@ -28,7 +31,7 @@ URLS_RAW_STRING = (
     r"([a-z-]+://)"  # scheme
     r"([a-z_\d-]+:[a-z_\d-]+@)?"  # user:password
     r"(www\.)?"  # www.
-    r"((?<!\.)[a-z\d]+[a-z\d.-]+\.[a-z]{2,6}|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|localhost)"  # domain
+    r"((?<!\.)[a-z\d]+[a-z\d.-]+\.[a-z]{2,6}|\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}|localhost)" # domain
     r"(:\d{2,})?"  # port number
     r"(/[a-z\d_%+-]*)*"  # folders
     r"(\.[a-z\d_%+-]+)*"  # file extension
@@ -150,7 +153,7 @@ def is_none_or_empty(in_str: Optional[str]) -> bool:
     True
     >>> is_none_or_empty(None)
     True
-    >>> is_none_or_empty(" ")
+    >>> is_none_or_empty("   \t   ")
     True
     >>> is_none_or_empty('Test')
     False
@@ -175,18 +178,22 @@ def is_string(obj: Any) -> bool:
 
 
 def is_empty_string(in_str: Any) -> bool:
+    return is_empty(in_str)
+
+
+def is_empty(in_str: Any) -> bool:
     """
     Checks if input is a string and empty or only whitespace.
 
-    >>> is_empty_string('')
+    >>> is_empty('')
     True
-    >>> is_empty_string('    \t\t    ')
+    >>> is_empty('    \t\t    ')
     True
-    >>> is_empty_string('test')
+    >>> is_empty('test')
     False
-    >>> is_empty_string(100.88)
+    >>> is_empty(100.88)
     False
-    >>> is_empty_string([1, 2, 3])
+    >>> is_empty([1, 2, 3])
     False
     """
     return is_string(in_str) and in_str.strip() == ""
@@ -733,8 +740,6 @@ def is_ip(in_str: Any) -> bool:
     """
     Checks if a string is a valid ip (either v4 or v6).
 
-    *Examples:*
-
     >>> is_ip('255.200.100.75')
     True
     >>> is_ip('2001:db8:85a3:0000:0000:8a2e:370:7334')
@@ -786,6 +791,9 @@ def extract_mac_address(in_str: Any, *, separator: str = ":") -> Optional[str]:
     >>> extract_mac_address(' MAC Address: 34:29:8F:12:0D:2F')
     '34:29:8F:12:0D:2F'
 
+    >>> extract_mac_address('? (10.0.0.30) at d8:5d:e2:34:54:86 on em0 expires in 1176 seconds [ethernet]')
+    'd8:5d:e2:34:54:86'
+
     """
     if not is_full_string(in_str):
         return None
@@ -1057,18 +1065,26 @@ def to_bool(in_str: str) -> bool:
 
     >>> to_bool('True')
     True
+
     >>> to_bool('1')
     True
+
     >>> to_bool('yes')
     True
+
     >>> to_bool('no')
     False
+
     >>> to_bool('huh?')
     False
+
+    >>> to_bool('on')
+    True
+
     """
     if not is_string(in_str):
         raise ValueError(in_str)
-    return in_str.lower() in ("true", "1", "yes", "y", "t")
+    return in_str.lower() in ("true", "1", "yes", "y", "t", "on")
 
 
 def to_date(in_str: str) -> Optional[datetime.date]:
@@ -1272,12 +1288,15 @@ def ngrams(txt: str, n: int):
 
     """
     words = txt.split()
-    return ngrams_presplit(words, n)
+    for ngram in ngrams_presplit(words, n):
+        ret = ''
+        for word in ngram:
+            ret += f'{word} '
+        yield ret.strip()
 
 
-def ngrams_presplit(words: Iterable[str], n: int):
-    for ngram in zip(*[words[i:] for i in range(n)]):
-        yield(' '.join(ngram))
+def ngrams_presplit(words: Sequence[str], n: int):
+    return list_utils.ngrams(words, n)
 
 
 def bigrams(txt: str):
@@ -1361,6 +1380,142 @@ def interpolate_using_dict(txt: str, values: Dict[str, str]) -> str:
     return sprintf(txt.format(**values), end='')
 
 
+def to_ascii(x: str):
+    """Encode as ascii bytes string.
+
+    >>> to_ascii('test')
+    b'test'
+
+    >>> to_ascii(b'1, 2, 3')
+    b'1, 2, 3'
+
+    """
+    if type(x) is str:
+        return x.encode('ascii')
+    if type(x) is bytes:
+        return x
+    raise Exception('to_ascii works with strings and bytes')
+
+
+def to_base64(txt: str, *, encoding='utf-8', errors='surrogatepass') -> str:
+    """Encode txt and then encode the bytes with a 64-character
+    alphabet.  This is compatible with uudecode.
+
+    >>> to_base64('hello?')
+    b'aGVsbG8/\\n'
+
+    """
+    return base64.encodebytes(txt.encode(encoding, errors))
+
+
+def is_base64(txt: str) -> bool:
+    """Determine whether a string is base64 encoded (with Python's standard
+    base64 alphabet which is the same as what uuencode uses).
+
+    >>> is_base64('test')    # all letters in the b64 alphabet
+    True
+
+    >>> is_base64('another test, how do you like this one?')
+    False
+
+    >>> is_base64(b'aGVsbG8/\\n')    # Ending newline is ok.
+    True
+
+    """
+    a = string.ascii_uppercase + string.ascii_lowercase + string.digits + '+/'
+    alphabet = set(a.encode('ascii'))
+    for char in to_ascii(txt.strip()):
+        if char not in alphabet:
+            return False
+    return True
+
+
+def from_base64(b64: str, encoding='utf-8', errors='surrogatepass') -> str:
+    """Convert base64 encoded string back to normal strings.
+
+    >>> from_base64(b'aGVsbG8/\\n')
+    'hello?'
+
+    """
+    return base64.decodebytes(b64).decode(encoding, errors)
+
+
+def chunk(txt: str, chunk_size):
+    """Chunk up a string.
+
+    >>> ' '.join(chunk('010011011100010110101010101010101001111110101000', 8))
+    '01001101 11000101 10101010 10101010 10011111 10101000'
+
+    """
+    if len(txt) % chunk_size != 0:
+        logger.warning(
+            f'String to chunk\'s length ({len(txt)} is not an even multiple of chunk_size ({chunk_size})')
+    for x in range(0, len(txt), chunk_size):
+        yield txt[x:x+chunk_size]
+
+
+def to_bitstring(txt: str, *, delimiter='', encoding='utf-8', errors='surrogatepass') -> str:
+    """Encode txt and then chop it into bytes.  Note: only bitstrings
+    with delimiter='' are interpretable by from_bitstring.
+
+    >>> to_bitstring('hello?')
+    '011010000110010101101100011011000110111100111111'
+
+    >>> to_bitstring('test', delimiter=' ')
+    '01110100 01100101 01110011 01110100'
+
+    >>> to_bitstring(b'test')
+    '01110100011001010111001101110100'
+
+    """
+    etxt = to_ascii(txt)
+    bits = bin(
+        int.from_bytes(
+            etxt,
+            'big'
+        )
+    )
+    bits = bits[2:]
+    return delimiter.join(chunk(bits.zfill(8 * ((len(bits) + 7) // 8)), 8))
+
+
+def is_bitstring(txt: str) -> bool:
+    """Is this a bitstring?
+
+    >>> is_bitstring('011010000110010101101100011011000110111100111111')
+    True
+
+    >>> is_bitstring('1234')
+    False
+
+    """
+    return is_binary_integer_number(f'0b{txt}')
+
+
+def from_bitstring(bits: str, encoding='utf-8', errors='surrogatepass') -> str:
+    """Convert from bitstring back to bytes then decode into a str.
+
+    >>> from_bitstring('011010000110010101101100011011000110111100111111')
+    'hello?'
+
+    """
+    n = int(bits, 2)
+    return n.to_bytes((n.bit_length() + 7) // 8, 'big').decode(encoding, errors) or '\0'
+
+
+def ip_v4_sort_key(txt: str) -> str:
+    """Turn an IPv4 address into a tuple for sorting purposes.
+
+    >>> ip_v4_sort_key('10.0.0.18')
+    (10, 0, 0, 18)
+
+    """
+    if not is_ip_v4(txt):
+        print(f"not IP: {txt}")
+        return None
+    return tuple([int(x) for x in txt.split('.')])
+
+
 if __name__ == '__main__':
     import doctest
     doctest.testmod()