pyutils package
Introduction
When I was writing little tools in Python and found myself implementing a generally useful pattern I stuffed it into a local library. That library grew into pyutils: a set of collections, helpers and utilities that I find useful and hope you will too.
This documentation is automatically generated at every git push
by
Sphinx and is hosted at
https://wannabe.guru.org/pydocs/pyutils/pyutils.html. You can
generate them yourself by running make html
(with GNU make) under
the docs/
folder.
The repo now lives on GitHub but a lot of the development happened against a local git server.
The LICENSE and NOTICE files at the root of the project describe reusing this code and where everything came from.
Installation
This project is now pyutils in PyPi, the default Python project index. To install with pip:
pip install pyutils
You’ll get a few dependencies and this library. The dependencies are high quality and stable:
antlr4-python3-runtime: ANTLR grammer/parser runtime dependencies for the date parsing grammar.
bitstring: easy bitwise operations on long operands
cloudpickle: a better version of Python’s built in pickle used in the parallelize code.
holidays: a list of US and international holidays, used in the date parser.
kazoo: a client side library with recipes for working with Apache Zookeeper, used if and only if you enable Zookeeper-based configs.
overrides: code decorator to mark and enforce method overrides.
pytz: Python timezones, used in date parser and utils.
You can also install the wheel directly; the latest is checked in under: https://github.com/scottgasch/pyutils/tree/main/dist. To do so, download it, check that the MD5 matches, and run:
pip install <filename.whl>
Development
All of the project’s code is located under src/pyutils/. Most code includes inline documentation and doctests. I’ve tried to organize it into logical packages based on the code’s functionality. Note that when words would collide with a Python standard library or reserved keyword I’ve used a ‘z’ at the end, e.g. ‘collectionz’ instead of ‘collections’ and ‘typez’ instead of ‘types’.
There’s some example code that uses various features of this project checked in under examples/ that you can check out. See the README in that directory for more information about what’s there.
Unit and integration tests are under tests/.
To run all tests:
cd tests/
./run_tests.py --all [--coverage]
See the README under tests/ and the code of run_tests.py for more options / information about running tests.
Package code is checked into a local git server and available to clone from git at https://github.com/scottgasch/pyutils.
For a long time this was just a local library on my machine that my tools imported but I’ve now decided to release it on PyPi. Earlier development happened in a different git repo:
To actually build the code (by which I mean type check it, lint it, package it, format it, etc…) you need some other dependencies installed:
black: I use black to auto-format the code
mypy: a Python type checker
coverage: used by the –coverage option of
run_tests.py
.flake8: a Python linter
pylint: another Python linter
sphinx: documenation generator
setuptools: to build the project artifacts
twine: to package and upload packages
Documentation
The documentation you’re browsing was created by Sphinx based largely on extracted code comments. It’s available at:
Support
Drop me a line if you are using this, find a bug, have a question, or have a suggestion:
–Scott Gasch (scott.gasch@gmail.com)
Subpackages
- pyutils.collectionz package
- Submodules
- pyutils.collectionz.bidict module
- pyutils.collectionz.bst module
BinarySearchTree
BinarySearchTree.depth()
BinarySearchTree.get_next_node()
BinarySearchTree.get_nodes_in_range_inclusive()
BinarySearchTree.get_root()
BinarySearchTree.height()
BinarySearchTree.insert()
BinarySearchTree.iterate_inorder()
BinarySearchTree.iterate_leaves()
BinarySearchTree.iterate_nodes_by_depth()
BinarySearchTree.iterate_postorder()
BinarySearchTree.iterate_preorder()
BinarySearchTree.parent_path()
BinarySearchTree.repr_traverse()
Node
- pyutils.collectionz.interval_tree module
- pyutils.collectionz.shared_dict module
- pyutils.collectionz.trie module
- Module contents
- pyutils.compress package
- pyutils.datetimes package
- Submodules
- pyutils.datetimes.constants module
- pyutils.datetimes.dateparse_utils module
- pyutils.datetimes.datetime_utils module
TimeUnit
add_timezone()
date_and_time_to_datetime()
date_to_datetime()
datetime_to_date()
datetime_to_date_and_time()
datetime_to_minute_number()
datetime_to_string()
datetime_to_time()
describe_duration()
describe_duration_briefly()
describe_timedelta()
describe_timedelta_briefly()
easter()
get_format_string()
is_timezone_aware()
is_timezone_naive()
minute_number()
minute_number_to_time_string()
n_timeunits_from_base()
now()
now_pacific()
parse_duration()
replace_time_timezone()
replace_timezone()
seconds_to_timedelta()
string_to_datetime()
strip_timezone()
time_to_datetime_today()
time_to_minute_number()
time_to_string()
timestamp()
timezone_abbrev_to_canonical_name()
timezone_abbrev_to_tz()
translate_timezone()
- Module contents
- pyutils.files package
- Submodules
- pyutils.files.directory_filter module
- pyutils.files.file_utils module
CreateFileWithMode
FileWriter
create_path_if_not_exist()
delete()
describe_file_atime()
describe_file_ctime()
describe_file_mtime()
describe_file_timestamp()
does_directory_exist()
does_file_exist()
does_path_exist()
expand_globs()
fix_multiple_slashes()
get_all_extensions()
get_canonical_path()
get_directories()
get_extension()
get_file_atime_age_seconds()
get_file_atime_as_datetime()
get_file_atime_timedelta()
get_file_ctime_age_seconds()
get_file_ctime_as_datetime()
get_file_ctime_timedelta()
get_file_md5()
get_file_mtime_age_seconds()
get_file_mtime_as_datetime()
get_file_mtime_timedelta()
get_file_raw_atime()
get_file_raw_ctime()
get_file_raw_mtime()
get_file_raw_timestamp()
get_file_raw_timestamps()
get_file_size()
get_files()
get_files_recursive()
get_matching_files()
get_matching_files_recursive()
get_path()
is_directory()
is_executable()
is_normal_file()
is_readable()
is_same_file()
is_symlink()
is_writable()
remove()
remove_hash_comments()
remove_newlines()
set_file_raw_atime()
set_file_raw_atime_and_mtime()
set_file_raw_mtime()
slurp_file()
strip_whitespace()
touch_file()
without_all_extensions()
without_extension()
without_path()
- pyutils.files.lockfile module
- Module contents
- pyutils.parallelize package
- Submodules
- pyutils.parallelize.deferred_operand module
- pyutils.parallelize.executors module
BaseExecutor
BundleDetails
BundleDetails.backup_bundles
BundleDetails.code_file
BundleDetails.controller
BundleDetails.end_ts
BundleDetails.failure_count
BundleDetails.function_name
BundleDetails.is_cancelled
BundleDetails.machine
BundleDetails.pickled_code
BundleDetails.pid
BundleDetails.result_file
BundleDetails.slower_than_global_p95
BundleDetails.slower_than_local_p95
BundleDetails.src_bundle
BundleDetails.start_ts
BundleDetails.username
BundleDetails.uuid
BundleDetails.was_cancelled
BundleDetails.worker
ConfigRemoteWorkerPoolProvider()
ProcessExecutor
RemoteExecutor
RemoteExecutorStatus
RemoteExecutorStatus.periodic_dump()
RemoteExecutorStatus.record_acquire_worker()
RemoteExecutorStatus.record_acquire_worker_already_locked()
RemoteExecutorStatus.record_bundle_details()
RemoteExecutorStatus.record_bundle_details_already_locked()
RemoteExecutorStatus.record_processing_began()
RemoteExecutorStatus.record_release_worker()
RemoteExecutorStatus.record_release_worker_already_locked()
RemoteExecutorStatus.total_idle()
RemoteExecutorStatus.total_in_flight()
RemoteWorkerPoolProvider
RemoteWorkerRecord
RemoteWorkerSelectionPolicy
RoundRobinRemoteWorkerSelectionPolicy
ThreadExecutor
WeightedRandomRemoteWorkerSelectionPolicy
get_remote_workers_filename()
- pyutils.parallelize.parallelize module
- pyutils.parallelize.selectable_event module
- pyutils.parallelize.smart_future module
- pyutils.parallelize.thread_utils module
- Module contents
- pyutils.search package
- pyutils.security package
- pyutils.typez package
- Submodules
- pyutils.typez.centcount module
- pyutils.typez.histogram module
- pyutils.typez.money module
- pyutils.typez.persistent module
- pyutils.typez.rate module
- pyutils.typez.type_utils module
- pyutils.typez.type_hints module
- Module contents
Submodules
pyutils.ansi module
This file mainly contains code for changing the nature of text printed to the console via ANSI escape sequences. For example, it can be used to emit text that is bolded, underlined, italicised, colorized, etc…
It does not contain ANSI escape sequences that do things like move the cursor or record/restore its position. It is focused on text style only.
The file includes a colorizing context that will apply color patterns based on regular expressions / callables to any data emitted to stdout that may be useful in adding color to other programs’ outputs, for instance.
In addition, it contains a mapping from color name to RGB value that it uses to enable friendlier color names in most of its functions. Here is the list of predefined color names it knows:
Note
You can also use raw RGB values with this module so you do not have to use these predefined color names unless you want to.
—
A bunch of color names mapped into RGB tuples and some methods for setting the text color, background color, style, etc… using ANSI escape sequences. See: https://en.wikipedia.org/wiki/ANSI_escape_code.
- class pyutils.ansi.ProgrammableColorizer(patterns: Iterable[Tuple[Pattern, Callable[[Match[str]], str]]])[source]
Bases:
_StdoutInterceptor
A colorizing interceptor; pass it re.Patterns -> methods that do something (usually add color to) the match. This may be useful for adding color to non-colorized text in a stream without changing the code that emits the text directly. In the example doctest below I’m inserting [RED] and [RESET] strings but you could just as easily insert escape sequences returned from
fg()
,bg()
, andreset()
.>>> def red(match: re.Match) -> str: ... return '[RED]'
>>> def reset(match: re.Match) -> str: ... return '[RESET]'
>>> with ProgrammableColorizer( [ (re.compile('^[^ ]+'), red), ... (re.compile('$'), reset) ] ) as c: ... c.write("This matches the pattern and will call red()") ... c.write(" ...this won't") [RED] matches the pattern and will call red()[RESET] ...this won't[RESET]
Setup the programmable colorizing context; tell it how to operate.
- Parameters:
patterns (Iterable[Tuple[Pattern, Callable[[Match[str]], str]]]) – an iterable collection of tuples. Each tuple has an re.Pattern that describes the text pattern which will trigger the colorization and a method to call when the pattern is matched. These methods receive the re.MATCH object and usually just emit some ANSI escape sequence to colorize the stream. See the example above.
- pyutils.ansi.bg(name: str | None = '', red: int | None = None, green: int | None = None, blue: int | None = None, *, force_16color: bool = False, force_216color: bool = False) str [source]
Returns an ANSI color code for changing the current text background color.
- Parameters:
name (str | None) – the name of the color to set
red (int | None) – the color to set’s red component value
green (int | None) – the color to set’s green component value
blue (int | None) – the color to set’s blue component value
force_16color (bool) – force bg to use 16 color mode
force_216color (bool) – force bg to use 216 color mode
- Returns:
A string containing the requested escape sequence
- Return type:
str
Note
16-color and 216-color spaces can’t be used to represent all colors describable by 8 bit R, G and B channels (i.e. normal R/G/B hex values) If you set the force_16color or force_216color arguments but describe a color (by name or R/G/B) that can’t be represented in the forced color space the code will pick the closest approximation available.
>>> import string_utils as su >>> su.to_base64(bg("red")) # b'[48;5;196m' b'G1s0ODs1OzE5Nm0=\n'
- pyutils.ansi.bg_16color(red: int, green: int, blue: int) str [source]
Set text background color to a color in 16-color space.
- Parameters:
red (int) – the red channel value of background color to set
green (int) – the green channel value of the background color to set
blue (int) – the blue channel value of the background color to set
- Returns:
An ANSI escape sequence that sets the background color to the color described by the red, green and blue parameters in the 16 color space.
- Return type:
str
Note
In 16 color mode, the possible color values are limited to red, green, yellow, blue, purple, cyan, white and black each with or without a “bright” attribute. This function takes R/G/B parameter values that can be used to describe colors that can’t be represented in 16-color space. If such a color is described by the parameters, it maps the color to its closest representation in 16-color space.
This is used by
bg()
internally but can be invoked directly if needed. See alsofg_16color()
,bg_216color()
, andbg_24bit()
.
- pyutils.ansi.bg_216color(red: int, green: int, blue: int) str [source]
Set text background color to a color in 216 color space.
- Parameters:
red (int) – the red channel value of the background color to set
green (int) – the green channel value of the background color to set
blue (int) – the blue channel value of the foreground color to set
- Returns:
An ANSI escape code that sets the background color described by the red, green and blue from the 216 color space.
- Return type:
str
Note
In 216 color mode there are 216 total colors available. This is less than the 16M (256^3) possibilities that can be described by full RGB tuples. When passed colors that are not available in 216 color mode, this code finds the closest match in 216 color space and returns that.
This is used by
bg()
internally but can be invoked directly if needed. See alsobg_16color()
,bg_24bit()
, andfg_216color()
.
- pyutils.ansi.bg_24bit(red: int, green: int, blue: int) str [source]
Set text background color to a color in 24-bit color space.
- Parameters:
red (int) – the red channel value of the background color to set
green (int) – the green channel value of the backgrounf color to set
blue (int) – the blue channel value of the background color to set
- Returns:
An ANSI escape code that sets the background color described by the red, green and blue from 24-bit color space.
- Return type:
str
Note
In 24-bit color space we can represent any color described by red, green or blue values where 0 <= value <= 255. Values outside of this range will be mapped into the 24-bit color space.
This is used by
fg()
internally but can be invoked directly if useful. See alsofg_216color()
andbg_24bit()
.
- pyutils.ansi.bold() str [source]
Returns: The ANSI escape sequence to set text to bold weight.
- Return type:
str
- pyutils.ansi.clear() str [source]
Returns: An ANSI escape sequence that clears the screen.
- Return type:
str
- pyutils.ansi.clear_line() str [source]
Returns: An ANSI escape sequence that clears the current line from the cursor position to the end of the line.
- Return type:
str
- pyutils.ansi.clear_screen() str [source]
Returns: An ANSI escape sequence that clears the screen.
- Return type:
str
- pyutils.ansi.fg(name: str | None = '', red: int | None = None, green: int | None = None, blue: int | None = None, *, force_16color: bool = False, force_216color: bool = False) str [source]
Return the ANSI escape sequence to change the foreground color text is printed to the console with. Target colors may be indicated either by name or R/G/B values. Result will use the 16 or 216 color scheme if force_16color or force_216color are passed (respectively). Otherwise the code will do what it thinks best.
- Parameters:
name (str | None) – the name of the color to set
red (int | None) – the color to set’s red component value
green (int | None) – the color to set’s green component value
blue (int | None) – the color to set’s blue component value
force_16color (bool) – force fg to use 16 color mode
force_216color (bool) – force fg to use 216 color mode
- Returns:
String containing the ANSI escape sequence to set desired foreground
- Return type:
str
Note
16-color and 216-color spaces can’t be used to represent all colors describable by 8 bit R, G and B channels (i.e. normal R/G/B hex values) If you set the force_16color or force_216color arguments but describe a color (by name or R/G/B) that can’t be represented in the forced color space the code will pick the closest approximation available.
>>> import string_utils as su >>> su.to_base64(fg('blue')) b'G1szODs1OzIxbQ==\n'
- pyutils.ansi.fg_16color(red: int, green: int, blue: int) str [source]
Set text foreground color to a color in 16-color space.
- Parameters:
red (int) – the red channel value of the foreground color to set
green (int) – the green channel value of the foreground color to set
blue (int) – the blue channel value of the foreground color to set
- Returns:
An ANSI escape code that sets the foreground color described by the red, green and blue from the 16 color space.
- Return type:
str
Note
In 16 color mode, the possible color values are limited to red, green, yellow, blue, purple, cyan, white and black each with or without a “bright” attribute. This function takes R/G/B parameter values that can be used to describe colors that can’t be represented in 16-color space. If such a color is described by the parameters, it maps the color to its closest representation in 16-color space.
This is used by
fg()
internally but can be called directly too. See alsofg_216color()
,fg_24bit()
, andbg_16color()
.
- pyutils.ansi.fg_216color(red: int, green: int, blue: int) str [source]
Set text foreground color to a color in 216 color space.
- Parameters:
red (int) – the red channel value of the foreground color to set
green (int) – the green channel value of the foreground color to set
blue (int) – the blue channel value of the foreground color to set
- Returns:
An ANSI escape code that sets the foreground color described by the red, green and blue from the 216 color space.
- Return type:
str
Note
In 216 color mode there are 216 total colors available. This is less than the 16M (256^3) possibilities that can be described by full RGB tuples. When passed colors that are not available in 216 color mode, this code finds the closest match in 216 color space and returns that.
This is used by
fg()
internally but can be invoked directly if needed. See alsofg_16color()
, :py:meth`fg_24bit`, andbg_216color()
.
- pyutils.ansi.fg_24bit(red: int, green: int, blue: int) str [source]
Set text foreground color to a color in 24-bit color space.
- Parameters:
red (int) – the red channel value of the foreground color to set
green (int) – the green channel value of the foreground color to set
blue (int) – the blue channel value of the foreground color to set
- Returns:
An ANSI escape code that sets the foreground color described by the red, green and blue from 24-bit color space.
- Return type:
str
Note
In 24-bit color space we can represent any color described by red, green or blue values where 0 <= value <= 255. Values outside of this range will be mapped into the 24-bit color space.
This is used by
fg()
internally but can be invoked directly if useful. See alsofg_216color()
andbg_24bit()
.
- pyutils.ansi.italic() str [source]
Returns: The ANSI escape sequence to set text to italics style.
- Return type:
str
- pyutils.ansi.italics() str [source]
Returns: The ANSI escape sequence to set text to italics style.
- Return type:
str
- pyutils.ansi.normal() str [source]
- Returns:
An ANSI escape sequence that resets text attributes to ‘normal’. This sequence ends any different foreground or background color settings. It also ends any special text styling (italics, bold, underline, etc…) that have been previously set.
- Return type:
str
See also
reset_bg()
andreset_fg()
.
- pyutils.ansi.pick_contrasting_color(name: str | None = '', red: int | None = None, green: int | None = None, blue: int | None = None) Tuple[int, int, int] [source]
This method will return a red, green, blue tuple representing a contrasting color given the red, green, blue of a background color or a color name of the background color. This is meant to help ensure text printed on this background color will be visible.
- Parameters:
name (str | None) – the name of the color to contrast
red (int | None) – the color to contrast’s red component value
green (int | None) – the color to contrast’s green component value
blue (int | None) – the color to contrast’s blue component value
- Returns:
An RGB tuple containing a contrasting color
- Return type:
Tuple[int, int, int]
>>> pick_contrasting_color(None, 20, 20, 20) (255, 255, 255)
>>> pick_contrasting_color("white") (0, 0, 0)
- pyutils.ansi.reset() str [source]
- Returns:
An ANSI escape sequence that resets text attributes to ‘normal’. This sequence ends any different foreground or background color settings. It also ends any special text styling (italics, bold, underline, etc…) that have been previously set.
- Return type:
str
See also
reset_bg()
andreset_fg()
.
- pyutils.ansi.reset_bg()[source]
Returns an ANSI escape sequence that resets text background color to the default but preserves foreground coloring and text attributes like bold, italics, underlines, etc…
- pyutils.ansi.reset_fg()[source]
Returns: an ANSI escape code to reset just the foreground color while preserving the background color and any other formatting (bold, italics, etc…)
- pyutils.ansi.strike_through() str [source]
Returns: The ANSI escape sequence to set text to strike-through mode.
- Return type:
str
pyutils.argparse_utils module
I use and love the Python internal argparse
module for
commandline argument parsing but found it lacking in some ways. This
module contains code to fill those gaps. See also pyutils.config
.
—
These are helpers for commandline argument parsing meant to work
with Python’s argparse
module from the standard library (See:
https://docs.python.org/3/library/argparse.html). It contains
validators for new argument types (such as free-form dates, durations,
IP addresses, etc…) and an action that creates a pair of flags: one
to disable a feature and another to enable it.
See also pyutils.config.OptionalRawFormatter
which is
automatically enabled if you use config
module.
- class pyutils.argparse_utils.ActionNoYes(option_strings: str, dest: str, default: str | None = None, required: bool = False, help: str | None = None)[source]
Bases:
Action
An argparse Action that allows for commandline arguments like this:
cfg.add_argument( '--enable_the_thing', action=ActionNoYes, default=False, help='Should we enable the thing?' )
This creates the following cmdline arguments:
--enable_the_thing --no_enable_the_thing
These arguments can be used to indicate the inclusion or exclusion of binary exclusive behaviors.
- Raises:
ValueError – illegal argument value or combination
- Parameters:
option_strings (str) –
dest (str) –
default (str | None) –
required (bool) –
help (str | None) –
- pyutils.argparse_utils.valid_bool(v: Any) bool [source]
If the string is a valid bool, return its value. Otherwise raise.
- Parameters:
v (Any) – data passed to an argument expecting a bool on the cmdline.
- Returns:
The boolean value of v
- Raises:
ArgumentTypeError – parse error (e.g. not a valid bool string)
- Return type:
bool
Sample usage:
args.add_argument( '--auto', type=argparse_utils.valid_bool, default=None, metavar='True|False', help='Use your best judgement about --primary and --secondary', )
>>> valid_bool(True) True
>>> valid_bool("true") True
>>> valid_bool("yes") True
>>> valid_bool("on") True
>>> valid_bool("1") True
>>> valid_bool("off") # Note: expect False; invalid would raise. False
>>> valid_bool(12345) Traceback (most recent call last): ... argparse.ArgumentTypeError: 12345
- pyutils.argparse_utils.valid_byte_count(txt: str) int [source]
If the string is a valid number of bytes, return an integer representing the requested byte count. This method uses
string_utils.suffix_string_to_number()
to parse and and accepts / understands:plain numbers (123456)
numbers with ISO suffixes (Mb, Gb, Pb, etc…)
- Parameters:
txt (str) – data passed to a commandline arg expecting a duration.
- Returns:
An integer number of bytes.
- Raises:
ArgumentTypeError – parse error (e.g. byte count not parsable)
- Return type:
int
Sample usage:
cfg.add_argument( '--max_file_size', type=argparse_utils.valid_byte_count, default=(1024 * 1024), metavar='NUM_BYTES', help='The largest file we may create', )
>>> valid_byte_count('1Mb') 1048576
>>> valid_byte_count("1234567") 1234567
>>> valid_byte_count("1M") 1048576
>>> valid_byte_count("1.2Gb") 1288490188
>>> valid_byte_count('1.2') # <--- contains a decimal Traceback (most recent call last): ... argparse.ArgumentTypeError: Invalid byte count: 1.2
>>> valid_byte_count(1234567) # <--- not a string Traceback (most recent call last): ... argparse.ArgumentTypeError: Invalid byte count: 1234567
>>> valid_byte_count('On a dark and stormy night') Traceback (most recent call last): ... argparse.ArgumentTypeError: Invalid byte count: On a dark and stormy night
- pyutils.argparse_utils.valid_date(txt: str) date [source]
If the string is a valid date, return it. Otherwise raise an ArgumentTypeError.
- Parameters:
txt (str) – data passed to a commandline flag expecting a date.
- Returns:
the datetime.date described by txt
- Raises:
ArgumentTypeError – parse error (e.g. date not valid)
- Return type:
date
Sample usage:
cfg.add_argument( "--date", nargs=1, type=argparse_utils.valid_date, metavar="DATE STRING", default=None )
>>> valid_date('6/5/2021') datetime.date(2021, 6, 5)
Note
dates like ‘next wednesday’ work fine, they are just hard to doctest for without knowing when the testcase will be executed… See
pyutils.datetimes.dateparse_utils
for other examples of usable expressions.>>> valid_date('next wednesday') -ANYTHING-
- pyutils.argparse_utils.valid_datetime(txt: str) datetime [source]
If the string is a valid datetime, return it. Otherwise raise an ArgumentTypeError.
- Parameters:
txt (str) – data passed to a commandline flag expecting a valid datetime.datetime.
- Returns:
The datetime.datetime described by txt
- Raises:
ArgumentTypeError – parse error (e.g. invalid datetime string)
- Return type:
datetime
Sample usage:
cfg.add_argument( "--override_timestamp", nargs=1, type=argparse_utils.valid_datetime, help="Don't use the current datetime, override to argument.", metavar="DATE/TIME STRING", default=None, )
>>> valid_datetime('6/5/2021 3:01:02') datetime.datetime(2021, 6, 5, 3, 1, 2)
>>> valid_datetime('Sun Dec 11 11:50:00 UTC 2022') datetime.datetime(2022, 12, 11, 11, 50)
Note
Because this code uses an English date-expression parsing grammar internally, much more complex datetimes can be expressed in free form. See
pyutils.datetimes.dateparse_utils
for details. These are not included in here because they are hard to write valid doctests for!>>> valid_datetime('next christmas at 4:15am') -ANYTHING-
- pyutils.argparse_utils.valid_duration(txt: str) timedelta [source]
If the string is a valid time duration, return a datetime.timedelta representing the duration described. This uses datetime_utils.parse_duration to parse durations and expects data such as:
15 days, 3 hours, 15 minutes
15 days 3 hours 15 minutes
15d 3h 15m
15d3h5m
3m 2s
1000s
If the duration is not parsable, raise an ArgumentTypeError.
- Parameters:
txt (str) – data passed to a commandline arg expecting a duration.
- Returns:
The datetime.timedelta described by txt.
- Raises:
ArgumentTypeError – parse error (e.g. invalid duration string)
- Return type:
timedelta
Sample usage:
cfg.add_argument( '--ip_cache_max_staleness', type=argparse_utils.valid_duration, default=datetime.timedelta(seconds=60 * 60 * 4), metavar='DURATION', help='Max acceptable age of the IP address cache' )
>>> valid_duration('15d3h5m') datetime.timedelta(days=15, seconds=11100)
>>> valid_duration('15 days 3 hours 5 min') datetime.timedelta(days=15, seconds=11100)
>>> valid_duration('3m') datetime.timedelta(seconds=180)
>>> valid_duration('3 days, 2 hours') datetime.timedelta(days=3, seconds=7200)
>>> valid_duration('a little while') Traceback (most recent call last): ... argparse.ArgumentTypeError: a little while is not a valid duration.
- pyutils.argparse_utils.valid_filename(filename: str) str [source]
If the string contains a valid filename that exists on the filesystem, return it. Otherwise raise an ArgumentTypeError.
Note
This method will accept directories that exist on the filesystem in addition to files.
- Parameters:
filename (str) – data passed to a flag expecting a valid filename.
- Returns:
The filename if valid, otherwise raises ArgumentTypeError.
- Raises:
ArgumentTypeError – parse error (e.g. file doesn’t exist)
- Return type:
str
Sample usage:
args.add_argument( '--network_mac_addresses_file', default='/home/scott/bin/network_mac_addresses.txt', metavar='FILENAME', help='Location of the network_mac_addresses file (must exist!).', type=argparse_utils.valid_filename, )
>>> valid_filename('/tmp') '/tmp'
>>> valid_filename('wfwefwefwefwefwefwefwefwef') Traceback (most recent call last): ... argparse.ArgumentTypeError: wfwefwefwefwefwefwefwefwef was not found and is therefore invalid.
- pyutils.argparse_utils.valid_ip(ip: str) str [source]
If the string is a valid IPv4 address, return it. Otherwise raise an ArgumentTypeError.
- Parameters:
ip (str) – data passed to a commandline arg expecting an IP(v4) address.
- Returns:
The IP address, if valid.
- Raises:
ArgumentTypeError – parse error (e.g. not a valid IP address string)
- Return type:
str
Sample usage:
args.add_argument( "-i", "--ip_address", metavar="TARGET_IP_ADDRESS", help="Target IP Address", type=argparse_utils.valid_ip, )
>>> valid_ip("1.2.3.4") '1.2.3.4'
>>> valid_ip("localhost") Traceback (most recent call last): ... argparse.ArgumentTypeError: localhost is an invalid IP address
- pyutils.argparse_utils.valid_mac(mac: str) str [source]
If the string is a valid MAC address, return it. Otherwise raise an ArgumentTypeError.
- Parameters:
mac (str) – a value passed to a commandline flag expecting a MAC address.
- Returns:
The MAC address passed
- Raises:
ArgumentTypeError – parse error (e.g. not a valid MAC address)
- Return type:
str
Sample usage:
group.add_argument( "-m", "--mac", metavar="MAC_ADDRESS", help="Target MAC Address", type=argparse_utils.valid_mac, )
>>> valid_mac('12:23:3A:4F:55:66') '12:23:3A:4F:55:66'
>>> valid_mac('12-23-3A-4F-55-66') '12-23-3A-4F-55-66'
>>> valid_mac('big') Traceback (most recent call last): ... argparse.ArgumentTypeError: big is an invalid MAC address
- pyutils.argparse_utils.valid_percentage(num: str) float [source]
If the string is a valid (0 <= n <= 100) percentage, return it. Otherwise raise an ArgumentTypeError.
- Arg:
- num: data passed to a flag expecting a percentage with a value
between 0 and 100 inclusive.
- Returns:
The number if valid, otherwise raises ArgumentTypeError.
- Raises:
ArgumentTypeError – parse error (e.g. not a valid percentage)
- Parameters:
num (str) –
- Return type:
float
Sample usage:
args.add_argument( '--percent_change', type=argparse_utils.valid_percentage, default=0, help='The percent change (0<=n<=100) of foobar', )
>>> valid_percentage("15%") 15.0
>>> valid_percentage('40') 40.0
>>> valid_percentage('115') Traceback (most recent call last): ... argparse.ArgumentTypeError: 115 is an invalid percentage; expected 0 <= n <= 100.0
pyutils.bootstrap module
The bootstrap module defines a decorator meant to wrap your main function. This is optional, of course: you can use this library without using the bootstrap decorator on your main. If you choose to use it, though, it will do some work for you automatically.
—
If you decorate your main method (i.e. program entry point) like this:
@bootstrap.initialize
def main():
whatever
…you will get:
automatic support for
pyutils.config
(argument parsing, see that module for details),The ability to break into pdb on unhandled exceptions (which is enabled/disabled via the commandline flag
--debug_unhandled_exceptions
),automatic logging support from
pyutils.logging_utils
controllable via several commandline flags,the ability to optionally enable whole-program code profiling and reporting when you run your code using commandline flag
--run_profiler
,the ability to optionally enable import auditing via the commandline flag
--audit_import_events
. This logs a message whenever a module is imported after the bootstrap module itself is loaded. Note that other modules may already be loaded when bootstrap is loaded and these imports will not be logged. If you’re trying to debug import events or dependency problems, I suggest putting bootstrap very early in your import list and using this flag.optional memory profiling for your program set via the commandline flag
--trace_memory
. This provides a report of python memory utilization at program termination time.the ability to set the global random seed via commandline flag for reproducable runs (as long as subsequent code doesn’t reset the seed) using the
--set_random_seed
flag,automatic program timing and reporting logged to the INFO log,
more verbose error handling and reporting.
- class pyutils.bootstrap.ImportInterceptor[source]
Bases:
MetaPathFinder
An interceptor that always allows module load events but dumps a record into the log and onto stdout when modules are loaded and produces an audit of who imported what at the end of the run. It can’t see any load events that happen before it, though, so move bootstrap up in your __main__’s import list just temporarily to get a good view.
- find_module(fullname, path) NoReturn [source]
Return a loader for the module.
If no module is found, return None. The fullname is a str and the path is a list of strings or None.
This method is deprecated since Python 3.4 in favor of finder.find_spec(). If find_spec() exists then backwards-compatible functionality is provided for this method.
- Return type:
NoReturn
- pyutils.bootstrap.dump_all_objects() None [source]
Helper code to dump all known python objects.
- Return type:
None
pyutils.config module
The config module is an opinionated way to set up input parameters to
your program. It is enabled by using the pyutils.bootstrap
decorator around your main entry point or by simply calling
pyutils.config.parse()
early in main (which is what
pyutils.bootstrap.initialize()
does for you).
If you use this module, input parameters to your program come from
the commandline (and are configured using Python’s argparse
).
But they can also be be augmented or replaced using saved configuration
files stored either on the local filesystem or on Apache Zookeeper.
There is a provision for enabling dynamic arguments (i.e. that can change
during runtime) via Zookeeper (which is disabled by default).
—
Global program configuration driven by commandline arguments and, optionally, from saved (local or Zookeeper) configuration files… with optional support for dynamic arguments (i.e. that can change during runtime).
Let’s start with an example of how to use pyutils.config
. It’s
pretty easy for normal commandline arguments because it wraps argparse
(see https://docs.python.org/3/library/argparse.html):
In your file.py:
from pyutils import config # Call add_commandline_args to get an argparse.ArgumentParser # for file.py. Each file uses a separate ArgumentParser # chained off the main namespace. parser = config.add_commandline_args( "Module", "Args related to module doing the thing.", ) # Then simply add argparse-style arguments to it, as usual. parser.add_argument( "--module_do_the_thing", type=bool, default=True, help="Should the module do the thing?" )In your main.py:
from pyutils import config # main.py may have some arguments of its own, so add them. parser = config.add_commandline_args( "Main", "A program that does the thing.", ) parser.add_argument( "--dry_run", type=bool, default=False, help="Should we really do the thing?" ) def main() -> None: config.parse() # Then remember to call config.parse() early on.If you set this up and remember to invoke
pyutils.config.parse()
, all commandline arguments will play nicely together across all modules / files in your program automatically. Argparse help messages will group flags by the file they affect.If you use
pyutils.bootstrap.initialize()
, a decorator that can optionally wrap your program’s entry point, it will remember to callpyutils.config.parse()
for you so you can omit the last part. That looks like this:from pyutils import bootstrap @bootstrap.initialize def main(): whatever if __name__ == '__main__': main()Either way, you’ll get an aggregated usage message along with flags broken down per file in help:
% main.py -h usage: main.py [-h] [--module_do_the_thing MODULE_DO_THE_THING] [--dry_run DRY_RUN] Module: Args related to module doing the thing. --module_do_the_thing MODULE_DO_THE_THING Should the module do the thing? Main: A program that does the thing --dry_run Should we really do the thing?Once
pyutils.config.parse()
has been called (either automatically bypuytils.bootstrap
or manually, the program configuration state is ready in a dict-like object called config.config. For example, to check the state of the –dry_run flag:if not config.config['dry_run']: module.do_the_thing()Using
pyutils.config
allows you to “save” and “load” whole sets of commandline arguments using the –config_savefile and the –config_loadfile arguments. The former saves all arguments (other than itself) to an ascii file whose path you provide. The latter reads all arguments from an ascii file whose path you provide.Saving and loading sets of arguments can make complex operations easier to set up. They also allows for dynamic arguments.
If you use Apache Zookeeper, you can prefix paths to –config_savefile and –config_loadfile with the string “zk:” to cause the path to be interpreted as a Zookeeper path rather than one on the local filesystem. When loading arguments from Zookeeker, the
pyutils.config
code registers a listener to be notified on state change (e.g. when some other instance overwrites your Zookeeper based configuration). Listeners then dynamically update the value of any flag in the config.config dict whose name contains the string “dynamic”. So, for example, the –dynamic_database_connect_string argument would be modifiable at runtime when using Zookeeper based configurations. Flags that do not contain the string “dynamic” will not change. And nothing is dynamic unless we’re reading configuration from Zookeeper.For more information about Zookeeper, see https://zookeeper.apache.org/.
- class pyutils.config.Config[source]
Bases:
object
Warning
Do not instantiate this class directly; it is meant to be a global singleton called pyutils.config.CONFIG. Instead, use
pyutils.config.add_commandline_args()
to get an ArgumentGroup and add your arguments to it. Then callpyutils.config.parse()
to parse global configuration from your main program entry point.Everything in the config module used to be module-level functions and variables but it made the code ugly and harder to maintain. Now, this class does the heavy lifting. We still rely on some globals, though:
ARGS and GROUP to interface with argparse
PROGRAM_NAME stores argv[0] close to program invocation
ORIG_ARGV stores the original argv list close to program invocation
CONFIG and config: hold the (singleton) instance of this class.
- static add_commandline_args(title: str, description: str = '') _ArgumentGroup [source]
Create a new context for arguments and return an ArgumentGroup to the caller for module-level population.
- Parameters:
title (str) – A title for your module’s commandline arguments group.
description (str) – A helpful description of your module.
- Returns:
An argparse._ArgumentGroup to be populated by the caller.
- Return type:
_ArgumentGroup
- get(key: str, default: Any = None) Any | None [source]
- Parameters:
key (str) –
default (Any) –
- Return type:
Any | None
- has_been_parsed() bool [source]
Returns True iff the global config has already been parsed
- Return type:
bool
- static is_flag_already_in_argv(var: str) bool [source]
- Returns:
True if a particular flag is passed on the commandline and False otherwise.
- Parameters:
var (str) – The flag to search for.
- Return type:
bool
- static overwrite_argparse_epilog(msg: str) None [source]
Allows your code to override the default epilog created by argparse.
- Parameters:
msg (str) – The epilog message to substitute for the default.
- Return type:
None
- parse(entry_module: str | None) Dict[str, Any] [source]
Main program should invoke this early in main(). Note that the
pyutils.bootstrap.initialize()
wrapper takes care of this automatically. This should only be called once per program invocation.- Parameters:
entry_module (str | None) – Optional string to ensure we understand which module contains the program entry point. Determined heuristically if not provided.
- Returns:
- A dict containing the parsed program configuration. Note that this can
be safely ignored since it is also saved in config.config and may be used directly using that identifier.
- Raises:
PyUtilsUnrecognizedArgumentsException – if unrecognized config argument(s) are detected and the –config_rejects_unrecognized_arguments argument is enabled.
- Return type:
Dict[str, Any]
- class pyutils.config.OptionalRawFormatter(prog, indent_increment=2, max_help_position=24, width=None)[source]
Bases:
HelpFormatter
This formatter has the same bahavior as the normal argparse text formatter except when the help text of an argument begins with “RAW|”. In that case, the line breaks are preserved and the text is not wrapped. It is enabled automatically if you use
pyutils.config
.Use this by prepending “RAW|” in your help message to disable word wrapping and indicate that the help message is already formatted and should be preserved. Here’s an example usage:
args.add_argument( '--mode', type=str, default='PLAY', choices=['CHEAT', 'AUTOPLAY', 'SELFTEST', 'PRECOMPUTE', 'PLAY'], metavar='MODE', help='''RAW|Our mode of operation. One of: PLAY = play wordle with me! Pick a random solution or specify a solution with --template. CHEAT = given a --template and, optionally, --letters_in_word and/or --letters_to_avoid, return the best guess word; AUTOPLAY = given a complete word in --template, guess it step by step showing work; SELFTEST = autoplay every possible solution keeping track of wins/losses and average number of guesses; PRECOMPUTE = populate hash table with optimal guesses. ''', )
- pyutils.config.add_commandline_args(title: str, description: str = '') _ArgumentGroup [source]
Create a new context for arguments and return a handle. An alias for config.config.add_commandline_args.
- Parameters:
title (str) – A title for your module’s commandline arguments group.
description (str) – A helpful description of your module.
- Returns:
An argparse._ArgumentGroup to be populated by the caller.
- Return type:
_ArgumentGroup
- pyutils.config.argv_after_parse() List[str] | None [source]
Return the argv with all known arguments removed.
- Return type:
List[str] | None
- pyutils.config.error(message: str, exit_code: int = 1) None [source]
Convenience method for indicating a configuration error.
- Parameters:
message (str) –
exit_code (int) –
- Return type:
None
- pyutils.config.has_been_parsed() bool [source]
Returns True iff the global config has already been parsed
- Return type:
bool
- pyutils.config.is_flag_already_in_argv(var: str) bool [source]
Returns true if a particular flag is passed on the commandline and false otherwise.
- Parameters:
var (str) – The flag to search for.
- Return type:
bool
- pyutils.config.late_logging() None [source]
Log messages saved earlier now that logging has been initialized.
- Return type:
None
- pyutils.config.overwrite_argparse_epilog(msg: str) None [source]
Allows your code to override the default epilog created by argparse.
- Parameters:
msg (str) – The epilog message to substitute for the default.
- Return type:
None
- pyutils.config.parse(entry_module: str | None) Dict[str, Any] [source]
Main program should call this early in main(). Note that the
bootstrap.initialize
wrapper takes care of this automatically. This should only be called once per program invocation. Subsequent calls do not reparse the configuration settings but rather just return the current state.- Parameters:
entry_module (str | None) –
- Return type:
Dict[str, Any]
- pyutils.config.print_usage() None [source]
Prints the normal help usage message out.
- Return type:
None
pyutils.dataclass_utils module
Utilities for dealing with Dataclasses. A non-official type hint and some friendly wrappers around conversion to/from Dicts.
- pyutils.dataclass_utils.dataclass_from_dict(dataclass: type, d: Dict[str, Any]) Dataclass [source]
Given a Dataclass type and a dict, return a populated instance.
- Parameters:
dataclass (type) – the Class type to return an instance of
d (Dict[str, Any]) – the dict to be used to populate the new instance
- Returns:
A constructed and populated dataclass instance.
- Return type:
Dataclass
>>> from dataclasses import dataclass >>> from datetime import date
>>> @dataclass ... class Record: ... name: str ... phone: str ... address: str ... age: int ... member_since: date ...
>>> d = { ... 'name': 'John Smith', ... 'phone': '555-1234', ... 'address': '994 Main St.', ... 'age': 26, ... 'member_since': date(2006, 5, 14), ... }
>>> dataclass_from_dict(Record, d) Record(name='John Smith', phone='555-1234', address='994 Main St.', age=26, member_since=datetime.date(2006, 5, 14))
- pyutils.dataclass_utils.dataclass_to_dict(dataclass: Dataclass) Dict[str, Any] [source]
- Returns:
A dict-representation of a valid dataclass.
- Parameters:
dataclass (Dataclass) –
- Return type:
Dict[str, Any]
>>> from dataclasses import dataclass >>> from datetime import date
>>> @dataclass ... class Record: ... name: str ... phone: str ... address: str ... age: int ... member_since: date ... >>> r = Record(name='Jane Doe', phone='555-1232', address='998 Main St.', age=23, member_since=date(2008, 3, 1)) >>> dataclass_to_dict(r) {'name': 'Jane Doe', 'phone': '555-1232', 'address': '998 Main St.', 'age': 23, 'member_since': datetime.date(2008, 3, 1)}
pyutils.decorator_utils module
This is a grab bag of, hopefully, useful decorators.
- class pyutils.decorator_utils.DelayWhen(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
IntEnum
This enum is used with the @delay decorator to indicate that the delay should happen before wrapped function invocation, after wrapped function invocation, or both.
See:
delay()
.- AFTER_CALL = 2
- BEFORE_AND_AFTER = 3
- BEFORE_CALL = 1
- pyutils.decorator_utils.call_probabilistically(probability_of_call: float) Callable [source]
Calls the wrapped function probabilistically given a rate between 0.0 and 1.0 inclusive (0% probability and 100% probability).
- Parameters:
probability_of_call (float) – probability with which to invoke the wrapped function. Must be 0 <= probabilty <= 1.0.
- Raises:
ValueError – invalid probability argument
- Return type:
Callable
Example usage… this example would skip the invocation of log_the_entire_request_message 95% of the time and only invoke if 5% of the time.:
@call_probabilistically(0.05) def log_the_entire_request_message(message: Whatever): expensive work to save message to the log
- pyutils.decorator_utils.debug_args(func: Callable) Callable [source]
Print the function signature and return value at each call.
>>> @debug_args ... def foo(a, b, c): ... print(a) ... print(b) ... print(c) ... return (a + b, c)
>>> foo(1, 2.0, "test") Calling foo(1:<class 'int'>, 2.0:<class 'float'>, 'test':<class 'str'>) 1 2.0 test foo returned (3.0, 'test'):<class 'tuple'> (3.0, 'test')
- Parameters:
func (Callable) –
- Return type:
Callable
- pyutils.decorator_utils.debug_count_calls(func: Callable) Callable [source]
Count function invocations and print a message befor every call.
>>> @debug_count_calls ... def factoral(x): ... if x == 1: ... return 1 ... return x * factoral(x - 1)
>>> factoral(5) Call #1 of 'factoral' Call #2 of 'factoral' Call #3 of 'factoral' Call #4 of 'factoral' Call #5 of 'factoral' 120
- Parameters:
func (Callable) –
- Return type:
Callable
- pyutils.decorator_utils.decorate_matching_methods_with(decorator: Callable, acl: Callable | None = None)[source]
Apply the given decorator to all methods in a class whose names begin with prefix. If prefix is None (default), decorate all methods in the class.
- Parameters:
decorator (Callable) – the decorator to apply to matching class methods.
acl (Callable | None) – the matcher used to predicate decorator application; None, the default, applies the decorator to all class methods. See
pyutils.security.acl
for more information and options.
Example usage to wrap all methods whose names begin with either “enter” or “exit” with the @invocation_logged decorator (see
invocation_logged()
):import pyutils.decorator_utils import pyutils.security.acl as acl @decorator_utils.decorate_matching_methods_with( decorator_utils.invocation_logged, acl.StringWildcardBasedACL( allowed_patterns=['enter*', 'exit*'], acl.Order.ALLOW_DENY ) ) class MyClass: def __init__(self): self.name = None self.rating = None def __repr__(self) -> str: return f'{self.name} @ {self.rating}' def enterName(self, n: str) -> None: if len(n) > 5: self.name = n def exitName(self, n: str) -> None: pass def enterRating(self, r: int) -> None: if 1 <= r <= 5: self.rating = r def exitRating(self, r: int) -> None: pass
- pyutils.decorator_utils.delay(_func: Callable | None = None, *, seconds: float = 1.0, when: DelayWhen = DelayWhen.BEFORE_CALL) Callable [source]
Slow down a function by inserting a delay before and/or after its invocation.
- Parameters:
seconds (float) – how long should we delay (via a simple time.sleep())?
when (DelayWhen) – when should we delay.. before the invocation, after it, or both?
_func (Callable | None) –
- Return type:
Callable
>>> @delay(seconds=1.0) ... def foo(): ... pass
>>> import time >>> start = time.time() >>> foo() >>> dur = time.time() - start >>> dur >= 1.0 True
- pyutils.decorator_utils.deprecated(func)[source]
This is a decorator which can be used to mark functions as deprecated. It will result in a warning being emitted when the function is used. The warning includes the caller as determined by examining the stack in the warning log.
>>> @deprecated ... def foo() -> None: ... pass >>> foo() # prints + logs "Call to deprecated function foo"
- pyutils.decorator_utils.invocation_logged(func: Callable) Callable [source]
Log the call of a function on sys.stdout and the info log.
>>> @invocation_logged ... def foo(): ... print('Hello, world.')
>>> foo() Entered foo Hello, world. Exited foo
- Parameters:
func (Callable) –
- Return type:
Callable
- pyutils.decorator_utils.jittery_delay_helper(delay: float) None [source]
- Parameters:
delay (float) –
- Return type:
None
- pyutils.decorator_utils.memoized(func: Callable) Callable [source]
Keep a cache of previous function call results. Use this with pure functions without side effects that do expensive work.
The internal cache is a simple dict with a key based on the arguments to the call so the result of the function must be determined only by its parameters (i.e. it must be “functional”) or this will introduce errors. See: https://en.wikipedia.org/wiki/Functional_programming#Pure_functions
Consider also:
functools.cache()
for a more advanced implementation. See: https://docs.python.org/3/library/functools.html#functools.cache>>> import time >>> @memoized ... def expensive(arg) -> int: ... # Simulate something slow to compute or lookup, like a ... # computationally expensive task or a network read of ... # static data (i.e. that should never change). ... time.sleep(1.0) ... return arg * arg
>>> start = time.time() >>> expensive(5) # Takes about 1 sec 25 >>> expensive(3) # Also takes about 1 sec 9 >>> expensive(5) # Pulls from cache, fast 25 >>> expensive(3) # Pulls from cache again, fast 9 >>> dur = time.time() - start >>> dur < 3.0 True
- Parameters:
func (Callable) –
- Return type:
Callable
- pyutils.decorator_utils.normal_delay_helper(delay: float) None [source]
- Parameters:
delay (float) –
- Return type:
None
- pyutils.decorator_utils.predicated_retry_with_backoff(tries: int, *, predicate: Callable[[...], bool], delay_sec: float = 3.0, backoff: float = 2.0, on_attempt: Callable[[...], None] | None = None, on_success: Callable[[...], None] | None = None, on_failure: Callable[[...], None] | None = None, delay_helper: Callable[[float], None] | None = None, raise_on_repeated_failures: bool = False)[source]
Retries a function or method up to a certain number of times with a prescribed initial delay period and backoff rate (multiplier). Note that
retry_if_false()
andretry_if_none()
both use this class with a predefined predicate but you can also use it directly with your own custom predicate.- Parameters:
tries (int) – the maximum number of attempts to run the function
delay_sec (float) – sets the initial delay period in seconds
backoff (float) – a multiplier (must be >=1.0) used to modify the delay at each subsequent invocation
predicate (Callable[[...], bool]) – a Callable that will be passed the retval of the decorated function and must return True to indicate that we should stop calling or False to indicate a retry is necessary
on_attempt (Callable[[...], None] | None) – an optional callable to be invoked at each attempt
on_success (Callable[[...], None] | None) – an optional callable to be invoked on success
on_failure (Callable[[...], None] | None) – an optional callable to be invoked on failure
raise_on_repeated_failures (bool) – if True, raise a PyUtilsException if the wrapped function never succeeds (as indicated by the predicate). Otherwise simply returns the final error result.
delay_helper (Callable[[float], None] | None) –
- Raises:
ValueError – on invalid arguments; e.g. backoff must be >= 1.0, delay_sec must be >= 0.0, tries must be > 0.
PyUtilsException – if raise_on_repeated_failures is True and the wrapped function fails tries times. Otherwise simply returns the final error result.
Example usage that would call make_the_RPC_call up to three times (as long as it returns a tuple with False in the second element) with a delay of 1.0s the first time, 2.0s the second time, and 4.0s the third time.:
@decorator_utils.predicated_retry_with_backoff( 3, predicate=lambda _: _[2] is False, delay_sec=1.0, backoff=2 ) def make_the_RPC_call() -> Tuple[str, int, bool]: whatever
- pyutils.decorator_utils.rate_limited(n_calls: int, *, per_period_in_seconds: float = 1.0) Callable [source]
Limit invocation of a wrapped function to n calls per time period. Thread-safe. In testing this was relatively fair with multiple threads using it though that hasn’t been measured in detail.
Note
The doctest below makes use of
pyutils.parallelize.thread_utils.background_thread
. See that class’ documentation for details.>>> import time >>> from pyutils import decorator_utils >>> from pyutils.parallelize import thread_utils
>>> calls = 0
>>> @decorator_utils.rate_limited(10, per_period_in_seconds=1.0) ... def limited(x: int): ... global calls ... calls += 1
>>> @thread_utils.background_thread ... def a(stop): ... for _ in range(3): ... limited(_)
>>> @thread_utils.background_thread ... def b(stop): ... for _ in range(3): ... limited(_)
>>> start = time.time() >>> (thread1, event1) = a() >>> (thread2, event2) = b() >>> thread1.join() >>> thread2.join() >>> end = time.time() >>> dur = end - start >>> dur > 0.5 True >>> calls 6
- Parameters:
n_calls (int) –
per_period_in_seconds (float) –
- Return type:
Callable
- pyutils.decorator_utils.retry_if_false(tries: int, *, delay_sec: float = 3.0, backoff: float = 2.0)[source]
A helper for @predicated_retry_with_backoff that retries a decorated function as long as it keeps returning False.
- Parameters:
tries (int) – max number of times to retry
delay_sec (float) – initial delay before retry length in seconds
backoff (float) – a multiplier (must be >= 1.0) used to optionally increase subsequent delays on repeated failures.
Note
If after tries attempts the wrapped function is still failing, this code returns the failure result (i.e. False) to the caller.
>>> import time >>> counter = 0 >>> @retry_if_false(5, delay_sec=1.0, backoff=1.1) ... def foo(): ... global counter ... counter += 1 ... return counter >= 3
>>> start = time.time() >>> foo() # fail, delay 1.0, fail, delay 1.1, succeed True
>>> dur = time.time() - start >>> counter 3 >>> dur > 2.0 True >>> dur < 2.3 True
- pyutils.decorator_utils.retry_if_none(tries: int, *, delay_sec: float = 3.0, backoff: float = 2.0)[source]
A helper for @predicated_retry_with_backoff that continues to invoke the wrapped function as long as it keeps returning None. Retries up to N times with a delay between each retry and a backoff that can increase the delay.
- Parameters:
tries (int) – max number of times to retry
delay_sec (float) – initial delay before retry length in seconds
backoff (float) – a multiplier (must be >= 1.0) used to optionally increase subsequent delays on repeated failures.
Note
If after tries attempts the wrapped function is still failing, this code returns the failure result (i.e. None) to the caller.
Example usage… calls a function that reads a URL from the network and returns the raw HTTP response or None on error with up to three retries with an increasing backoff:
@retry_if_none(3, delay_sec=1.0, backoff=4.0) def fetch_the_image(url: str) -> Optional[bytes]: r = requests.get(url) if r.status_code != 200: return None return r.content # Use normally image_binary_data = fetch_the_image( 'https://www.whatever.com/foo/bar/baz.jpg' ) # Note: even with retries this might still fail; be prepared # to still receive a None return value. if image_binary_data is None: raise Exception(f"Couldn't read {url}?!")
- pyutils.decorator_utils.singleton(cls)[source]
A singleton decorator; adding this to a class results in the decorator making sure that there exists only one instance of that class globally in the program by creating an instance the first time the class is constructed and then returning the previously created singleton instance on subsequent creation requests.
See also
pyutils.persistent.persistent_autoloaded_singleton()
.>>> @singleton ... class global_configuration(object): ... pass
>>> a = global_configuration() >>> b = global_configuration() >>> a is b True >>> id(a) == id(b) True
- pyutils.decorator_utils.synchronized(_func=None, *, lock: None | allocate_lock | RLock = None) Callable [source]
Emulates java’s “synchronized” keyword: given a lock, require that threads take that lock (or wait) before invoking the wrapped function and automatically releases the lock afterwards.
- Parameters:
lock (None | allocate_lock | RLock) – the lock that must be held to invoke the wrapped function.
- Return type:
Callable
Example usage. Imagine we have shared state between multiple thread or processes and, to update the shared state, code should take a lock to ensure only one writer is modifying the state at a time. Any kind of python lock that has an acquire method can be used with the @synchronized decorator and it will handle acquisition and release automatically:
import threading lock = threading.Lock() @synchronized(lock) def update_shared_state(): do some work
Note
If you pass no lock, a default lock will be used. This default lock is reentrant. e.g.:
@synchronized def do_something_single_threaded(): whatever
- pyutils.decorator_utils.thunkify(func)[source]
Make a function immediately return a function of no args which, when called, waits for the original result. Meanwhile spin up a background thread to begin computing the result in parallel.
Example usage… hide a slow network read behind a thunk that will block only when it is called:
@thunkify def read_url(url: str) -> Result: make a slow network read urls = [ long list of urls ] results = [] for url in urls: results.append(read_url(url))
In this example, we will start one background thread per url(!!) requested. The result of read_url is no longer a Result but rather a Callable (see thunk below) that, when invoked, awaits the Result and returns it.
For more control over things like the number of worker threads and the ability cause work to be done on background processes or even on other machines, see
pyutils.parallelize.SmartFuture
,pyutils.parallelize.DeferredOperation
andpyutils.parallelize.parallelize
.
- pyutils.decorator_utils.timed(func: Callable) Callable [source]
Prints + info logs the runtime of the decorated function at each invocation.
>>> @timed ... def foo(): ... import time ... time.sleep(0.01)
>>> foo() Finished foo in ...
- Parameters:
func (Callable) –
- Return type:
Callable
- pyutils.decorator_utils.timeout(seconds: float = 1.0, use_signals: bool | None = None, timeout_exception=<class 'TimeoutError'>, error_message='Function call timed out')[source]
Add a timeout to a function. If the function takes longer than the given timeout (in seconds) it will raise an exception and return control to the caller.
Note
the use_signals parameter is included in order to support multiprocessing scenarios (signal can only be used from the process’ main thread). When not using signals, timeout granularity will be rounded to the nearest 0.1s and will poll.
Warning
Beware that a @timeout on a function inside at the module-level will be evaluated at module load time and not when the wrapped function is invoked. This is somewhat counterintuitive and tricky and it can lead to problems when relying on the automatic main thread detection code (use_signals=None, the default) since the import probably happens on the main thread and the invocation can happen on a different thread (one which can’t use signals). If in doubt, do not use the automatic signal safety logic and set their use_signals argument explicitly.
- Raises:
Exception – the timeout was reached
- Parameters:
seconds (float) –
use_signals (bool | None) –
It is illegal to pass anything other than a function as the first parameter. The function is wrapped and returned to the caller.
>>> @timeout(0.2) ... def foo(delay: float): ... time.sleep(delay) ... return "ok"
>>> foo(0) 'ok'
>>> foo(1.0) Traceback (most recent call last): ... Exception: Function call timed out
pyutils.dict_utils module
This module contains helper functions for dealing with Python dictionaries.
- pyutils.dict_utils.coalesce(inputs: ~typing.Iterator[~typing.Dict[~typing.Hashable, ~typing.Any]], *, aggregation_function: ~typing.Callable[[~typing.Any, ~typing.Any, ~typing.Any], ~typing.Any] = <function coalesce_by_creating_list>) Dict[Hashable, Any] [source]
Coalesce (i.e. combine) N input dicts into one output dict containing the union of all keys / values in every input dict. When keys collide, apply the aggregation_function which, by default, creates a list of values with the same key in the output dict.
- Parameters:
inputs (Iterator[Dict[Hashable, Any]]) – an iterable set of dicts to coalesce
aggregation_function (Callable[[Any, Any, Any], Any]) –
a Callable to deal with key collisions; one of the below functions already defined or your own strategy:
coalesce_by_creating_list()
creates a list of values with the same key in the output dict.coalesce_by_creating_set()
creates a set of values with the same key in the output dict.coalesce_first_write_wins()
only preserves the first value with a duplicated key. Others are dropped silently.coalesce_last_write_wins()
only preserves the last value with a duplicated key. Others are dropped silently.raise_on_duplicated_keys()
raises an Exception on duplicated keys; use when keys should never collide.Your own strategy; Callables will be passed the key and two values and can return whatever they want which will be stored in the output dict.
- Returns:
The coalesced output dict.
- Return type:
Dict[Hashable, Any]
>>> a = {'a': 1, 'b': 2} >>> b = {'b': 1, 'c': 2, 'd': 3} >>> c = {'c': 1, 'd': 2} >>> coalesce([a, b, c]) {'a': 1, 'b': [1, 2], 'c': [1, 2], 'd': [2, 3]}
>>> coalesce([a, b, c], aggregation_function=coalesce_last_write_wins) {'a': 1, 'b': 1, 'c': 1, 'd': 2}
>>> coalesce([a, b, c], aggregation_function=raise_on_duplicated_keys) Traceback (most recent call last): ... KeyError: 'Key b is duplicated in more than one input dict.'
- pyutils.dict_utils.coalesce_by_creating_list(_, new_value, old_value)[source]
Helper for use with
coalesce()
that creates a list on collision.
- pyutils.dict_utils.coalesce_by_creating_set(key, new_value, old_value)[source]
Helper for use with
coalesce()
that creates a set on collision.
- pyutils.dict_utils.coalesce_first_write_wins(_, discarded_new_value, old_value)[source]
Helper for use with
coalsce()
that preserves the old value and discards the new one on collision.
- pyutils.dict_utils.coalesce_last_write_wins(_, new_value, discarded_old_value)[source]
Helper for use with
coalsce()
that klobbers the old with the new one on collision.
- pyutils.dict_utils.dict_to_key_value_lists(d: Dict[Hashable, Any]) Tuple[List[Hashable], List[Any]] [source]
Given a dict, decompose it into a list of keys and values.
- Parameters:
d (Dict[Hashable, Any]) – a dict
- Returns:
the first is the keys list and the second is the values list.
- Return type:
A tuple of two elements
>>> d = {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'} >>> (k, v) = dict_to_key_value_lists(d) >>> k ['name', 'phone', 'address', 'zip'] >>> v ['scott', '555-1212', '123 main st.', '12345']
- pyutils.dict_utils.init_or_inc(d: ~typing.Dict[~typing.Hashable, ~typing.Any], key: ~typing.Hashable, *, init_value: ~typing.Any = 1, inc_function: ~typing.Callable[[...], ~typing.Any] = <function <lambda>>) bool [source]
Initialize a dict value (if it doesn’t exist) or increments it (using the inc_function, which is customizable) if it already does exist.
See also
defaultdict
(https://docs.python.org/3/library/collections.html#collections.defaultdict) for a more pythonic alternative.- Parameters:
d (Dict[Hashable, Any]) – the dict to increment or initialize a value in
key (Hashable) – the key to increment or initialize
init_value (Any) – default initial value (see also
dict.setdefault()
)inc_function (Callable[[...], Any]) – Callable use to increment a value
- Returns:
True if the key already existed or False otherwise
- Return type:
bool
See also:
collections.defaultdict
andcollections.Counter
.>>> d = {} >>> init_or_inc(d, "test") False >>> init_or_inc(d, "test") True >>> init_or_inc(d, 'ing') False >>> d {'test': 2, 'ing': 1}
- pyutils.dict_utils.item_with_max_value(d: Dict[Hashable, Any]) Tuple[Hashable, Any] [source]
- Parameters:
d (Dict[Hashable, Any]) – a dict with comparable values
- Returns:
The key and value of the item with the highest value in a dict as a Tuple[key, value].
- Return type:
Tuple[Hashable, Any]
>>> d = {'a': 1, 'b': 2, 'c': 3} >>> item_with_max_value(d) ('c', 3) >>> item_with_max_value({}) Traceback (most recent call last): ... ValueError: max() arg is an empty sequence
- pyutils.dict_utils.item_with_min_value(d: Dict[Hashable, Any]) Tuple[Hashable, Any] [source]
- Parameters:
d (Dict[Hashable, Any]) – a dict with comparable values
- Returns:
The key and value of the item with the lowest value in a dict as a Tuple[key, value].
- Return type:
Tuple[Hashable, Any]
>>> d = {'a': 1, 'b': 2, 'c': 3} >>> item_with_min_value(d) ('a', 1)
- pyutils.dict_utils.key_with_max_value(d: Dict[Hashable, Any]) Hashable [source]
- Parameters:
d (Dict[Hashable, Any]) – a dict with comparable keys
- Returns:
The maximum key in the dict when comparing the keys with each other.
- Return type:
Hashable
Note
This code totally ignores values; it is comparing key against key to find the maximum key in the keyspace.
>>> d = {'a': 1, 'b': 2, 'c': 3} >>> key_with_max_value(d) 'c'
- pyutils.dict_utils.key_with_min_value(d: Dict[Hashable, Any]) Hashable [source]
- Parameters:
d (Dict[Hashable, Any]) – a dict with comparable keys
- Returns:
The minimum key in the dict when comparing the keys with each other.
- Return type:
Hashable
Note
This code totally ignores values; it is comparing key against key to find the minimum key in the keyspace.
>>> d = {'a': 1, 'b': 2, 'c': 3} >>> key_with_min_value(d) 'a'
- pyutils.dict_utils.max_key(d: Dict[Comparable, Any]) Comparable [source]
- Parameters:
d (Dict[Comparable, Any]) – a dict with comparable keys
- Returns:
The maximum key in dict (ignoring values totally)
- Return type:
Note
This code totally ignores values; it is comparing key against key to find the maximum key in the keyspace.
>>> d = {'a': 3, 'b': 2, 'c': 1} >>> max_key(d) 'c'
- pyutils.dict_utils.max_value(d: Dict[Hashable, Any]) Any [source]
- Parameters:
d (Dict[Hashable, Any]) – a dict with compatable values
- Returns:
The maximum value in the dict without its key.
- Return type:
Any
>>> d = {'a': 1, 'b': 2, 'c': 3} >>> max_value(d) 3
- pyutils.dict_utils.min_key(d: Dict[Comparable, Any]) Comparable [source]
- Parameters:
d (Dict[Comparable, Any]) – a dict with comparable keys
- Returns:
The minimum key in dict (ignoring values totally)
- Return type:
Note
This code totally ignores values; it is comparing key against key to find the minimum key in the keyspace.
>>> d = {'a': 3, 'b': 2, 'c': 1} >>> min_key(d) 'a'
- pyutils.dict_utils.min_value(d: Dict[Hashable, Any]) Any [source]
- Parameters:
d (Dict[Hashable, Any]) – a dict with comparable values
- Returns:
The minimum value in the dict without its key.
- Return type:
Any
>>> d = {'a': 1, 'b': 2, 'c': 3} >>> min_value(d) 1
- pyutils.dict_utils.parallel_lists_to_dict(keys: List[Hashable], values: List[Any]) Dict[Hashable, Any] [source]
Given two parallel lists (keys and values), create and return a dict.
- Parameters:
keys (List[Hashable]) – list containing keys and no duplicated keys
values (List[Any]) – a parallel list (to keys) containing values
- Returns:
A dict composed of zipping the keys list and values list together.
- Raises:
ValueError – if keys and values lists not the same length.
- Return type:
Dict[Hashable, Any]
>>> k = ['name', 'phone', 'address', 'zip'] >>> v = ['scott', '555-1212', '123 main st.', '12345'] >>> parallel_lists_to_dict(k, v) {'name': 'scott', 'phone': '555-1212', 'address': '123 main st.', 'zip': '12345'}
- pyutils.dict_utils.raise_on_duplicated_keys(key, new_value, old_value)[source]
Helper for use with
coalesce()
that raises an exception when a collision is detected.
- pyutils.dict_utils.shard(d: Dict[Hashable, Any], size: int) Iterator[Dict[Hashable, Any]] [source]
Shards (i.e. splits) a dict into N subdicts which, together, contain all keys/values from the original unsharded dict.
- Parameters:
d (Dict[Hashable, Any]) – the input dict to be sharded (split)
size (int) – the ideal shard size (number of elements per shard)
- Returns:
A generator that yields subsequent shards.
- Return type:
Iterator[Dict[Hashable, Any]]
Note
If len(d) is not an even multiple of size then the last shard will not have size items in it. It will have len(d) % size items instead.
>>> d = { ... 'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5, 'f': 6, ... 'g': 7, 'h': 8, 'i': 9, 'j': 10, 'k': 11, 'l': 12, ... } >>> for r in shard(d, 5): ... r {'a': 1, 'b': 2, 'c': 3, 'd': 4, 'e': 5} {'f': 6, 'g': 7, 'h': 8, 'i': 9, 'j': 10} {'k': 11, 'l': 12}
pyutils.exec_utils module
Helper methods concerned with executing subprocesses.
- pyutils.exec_utils.cmd(command: str, timeout_seconds: float | None = None) str [source]
Run a command and capture its output to stdout and stderr into a string buffer. Return that string as this function’s output.
- Parameters:
command (str) – the command to run
timeout_seconds (float | None) – the max number of seconds to allow the subprocess to execute or None to indicate no timeout
- Returns:
The captured output of the subprocess’ stdout as a string buffer
- Raises:
CalledProcessError – the child process didn’t exit cleanly
TimeoutExpired – the child process ran too long
- Return type:
str
Warning
This function invokes a subshell, beware of shell-injection attacks. Your code should sanitize the command using
shlex.quote()
on user-provided data before invoking this. See: https://docs.python.org/3/library/subprocess.html#security-considerations>>> cmd('/bin/echo foo')[:-1] 'foo'
>>> cmd('/bin/sleep 2', 0.01) Traceback (most recent call last): ... subprocess.TimeoutExpired: Command '/bin/sleep 2' timed out after 0.01 seconds
- pyutils.exec_utils.cmd_exitcode(command: str, timeout_seconds: float | None = None) int [source]
Run a command silently in the background and return its exit code once it has finished.
- Parameters:
command (str) – the command to run
timeout_seconds (float | None) – optional the max number of seconds to allow the subprocess to execute or None to indicate no timeout
- Returns:
the exit status of the subprocess once the subprocess has exited
- Raises:
TimeoutExpired – if timeout_seconds is provided and the child process executes longer than the limit.
- Return type:
int
>>> cmd_exitcode('/bin/echo foo', 10.0) 0
>>> cmd_exitcode('/bin/sleep 2', 0.01) Traceback (most recent call last): ... subprocess.TimeoutExpired: Command '['/bin/bash', '-c', '/bin/sleep 2']' timed out after 0.01 seconds
- pyutils.exec_utils.cmd_in_background(command: str, *, silent: bool = False) Popen [source]
Spawns a child process in the background and registers an exit handler to make sure we kill it if the parent process (us) is terminated.
- Parameters:
command (str) – the command to run
silent (bool) – do not allow any output from the child process to be displayed in the parent process’ window
- Returns:
- the
Popen
object that can be used to communicate with the background process.
- the
- Return type:
Popen
- pyutils.exec_utils.cmd_list(command: List[str]) str [source]
Run a command with args encapsulated in a list and return the output text as a string.
- Raises:
CalledProcessError – the child process didn’t exit cleanly
TimeoutExpired – the child process ran too long
- Parameters:
command (List[str]) –
- Return type:
str
- pyutils.exec_utils.cmd_showing_output(command: str, *, timeout_seconds: float | None = None) int [source]
Kick off a child process. Capture and emit all output that it produces on stdout and stderr in a raw, character by character, manner so that we don’t have to wait on newlines. This was done to capture, for example, the output of a subprocess that creates dots to show incremental progress on a task and render it correctly.
- Parameters:
command (str) – the command to execute
timeout_seconds (float | None) – terminate the subprocess if it takes longer than N seconds; None means to wait as long as it takes.
- Returns:
the exit status of the subprocess once the subprocess has exited. Raises TimeoutExpired after killing the subprocess if the timeout expires.
- Raises:
TimeoutExpired – if timeout expires before child terminates
- Return type:
int
- Side effects:
prints all output of the child process (stdout or stderr)
Warning
This function invokes a subshell, beware of shell-injection attacks. Your code should sanitize the command using
shlex.quote()
on user-provided data before invoking this. See: https://docs.python.org/3/library/subprocess.html#security-considerations
- pyutils.exec_utils.run_silently(command: str, timeout_seconds: float | None = None) None [source]
Run a command silently.
- Parameters:
command (str) – the command to run.
timeout_seconds (float | None) – the optional max number of seconds to allow the subprocess to execute or None (default) to indicate no time limit.
- Returns:
No return value; error conditions (including non-zero child process exits) produce exceptions.
- Raises:
CalledProcessError – if the child process fails (i.e. exit != 0)
TimeoutExpired – if the child process executes too long.
- Return type:
None
Warning
This function invokes a subshell, beware of shell-injection attacks. Your code should sanitize the command using
shlex.quote()
on user-provided data before invoking this. See: https://docs.python.org/3/library/subprocess.html#security-considerations>>> run_silently("/usr/bin/true")
>>> run_silently("/usr/bin/false") Traceback (most recent call last): ... subprocess.CalledProcessError: Command '/usr/bin/false' returned non-zero exit status 1.
pyutils.function_utils module
Helper methods dealing with functions.
- pyutils.function_utils.function_identifier(f: Callable) str [source]
Given a named Callable, return a string that identifies it. Usually that string is just “__module__:__name__” but there’s a corner case: when __module__ is __main__ (i.e. the callable is defined in the same module as __main__). In this case, f.__module__ returns “__main__” instead of the file that it is defined in. Work around this using pathlib.Path.
- Parameters:
f (Callable) – a Callable
- Returns:
A unique identifier for that callable in the format module:function that avoids the pseudo-module ‘__main__’
- Return type:
str
>>> function_identifier(function_identifier) 'function_utils:function_identifier'
pyutils.geocode module
Wrapper around US Census address geocoder API described here:
https://www2.census.gov/geo/pdfs/maps-data/data/Census_Geocoder_User_Guide.pdf
https://geocoding.geo.census.gov/geocoder/Geocoding_Services_API.pdf
Also try:
$ curl --form [email protected] \
--form benchmark=2020 \
https://geocoding.geo.census.gov/geocoder/locations/addressbatch \
--output geocoderesult.csv
- pyutils.geocode.batch_geocode_addresses(addresses: List[str]) List[str] | None [source]
Send a list of addresses for batch geocoding to a web service operated by the US Census Bureau.
- Parameters:
addresses (List[str]) – a list of addresses to geocode. Each line of the input list should be a single address in the form: “STREET ADDRESS, CITY, STATE, ZIPCODE”. Individual address components may be omitted and the service will make educated guesses but the commas delimiters between address components may not be omitted.
- Returns:
An array of the same size as the input array with one answer record per line. Returns None on error.
- Return type:
List[str] | None
Note: this code will deal with requests >10k addresses by chunking them internally because the census website disallows requests > 10k lines.
>>> batch_geocode_addresses( ... [ ... '4600 Silver Hill Rd, Washington, DC, 20233', ... '935 Pennsylvania Avenue, NW, Washington, DC, 20535-0001', ... '1600 Pennsylvania Avenue NW, Washington, DC, 20500', ... '700 Pennsylvania Avenue NW, Washington, DC, 20408', ... ] ... ) ['"1"," 4600 Silver Hill Rd, Washington, DC, 20233","Match","Exact","4600 SILVER HILL RD, WASHINGTON, DC, 20233","-76.92748724230091,38.84601622386623","76355984","L","24","033","802405","2004"', '"2"," 935 Pennsylvania Avenue, NW, Washington, DC","No_Match"', '"3"," 1600 Pennsylvania Avenue NW, Washington, DC, 20500","Match","Exact","1600 PENNSYLVANIA AVE NW, WASHINGTON, DC, 20500","-77.03654395730786,38.89869091865552","76225813","L","11","001","980000","1034"', '"4"," 700 Pennsylvania Avenue NW, Washington, DC, 20408","Match","Exact","700 PENNSYLVANIA AVE NW, WASHINGTON, DC, 20408","-77.02305485155983,38.89356561956657","76226346","L","11","001","980000","1025"']
- pyutils.geocode.geocode_address(address: str) Dict[str, ParsedJSON] | List[ParsedJSON] | str | int | float | bool | None [source]
Send a single address to the US Census geocoding API in order to lookup relevant data about it (including, if possible, its lat/long). The response is a parsed JSON chunk of data with N addressMatches in the result section and the details of each match within it.
- Parameters:
address (str) – the full address to lookup in the form: “STREET
ADDRESS –
CITY –
STATE –
be (ZIPCODE". These components may) –
but (omitted and the service will make educated guesses) –
included. (the commas delimiting each component must be) –
- Returns:
- A parsed json dict with a bunch of information about the
address contained within it. Each ‘addressMatch’ in the JSON describes the details of a possible match. Returns None if there was an error or the address is not known.
- Return type:
Dict[str, ParsedJSON] | List[ParsedJSON] | str | int | float | bool | None
>>> json = geocode_address('4600 Silver Hill Rd,, 20233') >>> json['result']['addressMatches'][0]['matchedAddress'] '4600 SILVER HILL RD, WASHINGTON, DC, 20233'
>>> json['result']['addressMatches'][0]['coordinates'] {'x': -76.92748724230096, 'y': 38.84601622386617}
pyutils.graph module
A simple graph class that can be optionally directed and weighted and some operations on it.
- class pyutils.graph.Graph(directed: bool = False)[source]
Bases:
object
Constructs a new Graph object.
- Parameters:
directed (bool) – are we modeling a directed graph? See
add_edge()
.
- add_edge(src: str, dest: str, weight: int | float = 1) None [source]
Adds a new (optionally weighted) edge between src and dest vertexes. If the graph is not directed (see c’tor) this also adds a reciprocal edge with the same weight back from dest to src too.
Note
If either or both of src and dest are not already added to the graph, they are implicitly added by adding this edge.
- Parameters:
src (str) – the source vertex id
dest (str) – the destination vertex id
weight (int | float) – optionally, the weight of the edge(s) added
- Return type:
None
>>> g = Graph() >>> g.add_edge('a', 'b') >>> g.add_edge('b', 'c', weight=2) >>> len(g.get_vertices()) 3 >>> g.get_edges() {'a': {'b': 1}, 'b': {'a': 1, 'c': 2}, 'c': {'b': 2}}
- add_vertex(vertex_id: str) bool [source]
Adds a new vertex to the graph.
- Parameters:
vertex_id (str) – the unique identifier of the new vertex.
- Returns:
True unless vertex_id is already in the graph.
- Return type:
bool
>>> g = Graph() >>> g.add_vertex('a') True >>> g.add_vertex('b') True >>> g.add_vertex('a') False >>> len(g.get_vertices()) 2
- bfs(starting_vertex: str, target: str | None = None) Generator[str, None, None] [source]
Performs a breadth first traversal of the graph.
- Parameters:
starting_vertex (str) – The BFS starting point.
target (str | None) – The vertex that, if found, we should halt the search.
- Returns:
An ordered sequence of vertex ids visited by the traversal.
- Return type:
Generator[str, None, None]
>>> g = Graph() >>> g.add_edge('A', 'B') >>> g.add_edge('A', 'C') >>> g.add_edge('B', 'D') >>> g.add_edge('C', 'D') >>> g.add_edge('D', 'E') >>> g.add_edge('E', 'F') >>> g.add_edge('E', 'G') >>> g.add_edge('F', 'F') >>> for node in g.bfs('A'): ... print(node) A B C D E F G
>>> for node in g.bfs('F', 'G'): ... print(node) F E D G
- dfs(starting_vertex: str, target: str | None = None) Generator[str, None, None] [source]
Performs a depth first traversal of the graph.
- Parameters:
starting_vertex (str) – The DFS starting point.
target (str | None) – The vertex that, if found, indicates to halt.
- Returns:
An ordered sequence of vertex ids visited by the traversal.
- Return type:
Generator[str, None, None]
>>> g = Graph() >>> g.add_edge('A', 'B') >>> g.add_edge('A', 'C') >>> g.add_edge('B', 'D') >>> g.add_edge('C', 'D') >>> g.add_edge('D', 'E') >>> g.add_edge('E', 'F') >>> g.add_edge('E', 'G') >>> g.add_edge('F', 'F') >>> for node in g.dfs('A'): ... print(node) A B D C E F G
>>> for node in g.dfs('F', 'B'): ... print(node) F E D B
- get_edges() Dict[str, Dict[str, int | float]] [source]
- Returns:
A dict whose keys are source vertexes and values are dicts of destination vertexes with values describing the weight of the edge from source to destination.
- Return type:
Dict[str, Dict[str, int | float]]
>>> g = Graph(directed=True) >>> g.add_edge('a', 'b') >>> g.add_edge('b', 'c', weight=2) >>> len(g.get_vertices()) 3 >>> g.get_edges() {'a': {'b': 1}, 'b': {'c': 2}, 'c': {}}
- get_vertices() List[str] [source]
- Returns:
a list of the vertex ids in the graph.
- Return type:
List[str]
>>> g = Graph() >>> g.add_vertex('a') True >>> g.add_edge('b', 'c') >>> g.get_vertices() ['a', 'b', 'c']
- minimum_path_between(source: str, dest: str) Tuple[int | float | None, List[str]] [source]
Compute the minimum path (lowest cost path) between source and dest.
Note
This method runs Dijkstra’s algorithm (https://en.wikipedia.org/wiki/Dijkstra%27s_algorithm) internally and caches the results. Subsequent calls made with the same source node before modifying the graph are less expensive due to these cached intermediate results.
- Returns:
A tuple containing the minimum distance of the path and the path itself. If there is no path between the requested nodes, returns (None, []).
- Parameters:
source (str) –
dest (str) –
- Return type:
Tuple[int | float | None, List[str]]
>>> g = Graph() >>> g.add_edge('A', 'B', 3) >>> g.add_edge('A', 'C', 2) >>> g.add_edge('B', 'D') >>> g.add_edge('C', 'D') >>> g.add_edge('D', 'E') >>> g.add_edge('E', 'F') >>> g.add_edge('E', 'G') >>> g.add_edge('F', 'F') >>> g.add_vertex('H') True >>> g.minimum_path_between('A', 'D') (3, ['A', 'C', 'D']) >>> g.minimum_path_between('A', 'H') (None, [])
- remove_edge(source: str, dest: str)[source]
Remove a previously added edge in the graph. If the graph is not directed (see
__init__()
), also removes the reciprocal edge from dest back to source.Note
This method does not remove vertexes (unlinked or otherwise).
- Parameters:
source (str) – the source vertex of the edge to remove
dest (str) – the destination vertex of the edge to remove
>>> g = Graph() >>> g.add_edge('A', 'B') >>> g.add_edge('B', 'C') >>> g.get_edges() {'A': {'B': 1}, 'B': {'A': 1, 'C': 1}, 'C': {'B': 1}} >>> g.remove_edge('A', 'B') >>> g.get_edges() {'B': {'C': 1}, 'C': {'B': 1}}
pyutils.id_generator module
A helper class for generating thread safe monotonically increasing id numbers.
Note
This code is thread safe but not process safe; for use only within one python process.
- pyutils.id_generator.get(name: str, *, start: int = 0) int [source]
Returns a thread-safe, monotonically increasing id suitable for use as a globally unique identifier.
- Parameters:
name (str) – the sequence identifier name.
start (int) – the starting id (i.e. the first id that should be returned)
- Returns:
An integer id such that within one sequence identifier name the id returned is unique and is the maximum id ever returned.
- Return type:
int
>>> import id_generator >>> id_generator.get('student_id') 0 >>> id_generator.get('student_id') 1 >>> id_generator.get('employee_id', start=10000) 10000 >>> id_generator.get('employee_id', start=10000) 10001
pyutils.input_utils module
Terminal input utilities.
- class pyutils.input_utils.KeystrokeReader[source]
Bases:
AbstractContextManager
Save the terminal settings, put the terminal in raw mode and return a helper that can be used to wait for and return a single keystroke event (with a timeout). Restores the previous terminal mode on exit.
Example usage:
with input_utils.KeystrokeReader() as get_keystroke: while True: # Check / parse any keys. key = get_keystroke(timeout_seconds=0.01) if key: if key == ' ': if not stopwatch.running: stopwatch.start() else: stopwatch.pause() elif key == 'r': stopwatch.reset() elif key == 'l': print(f"Lap: {stopwatch.get_elapsed_time():.5f}") elif key == 'q' or key == chr(3): break ...
pyutils.iter_utils module
A collection of Iterator
subclasses that can be composed
with another iterator and provide extra functionality:
- class pyutils.iter_utils.PeekingIterator(source_iter: Iterator)[source]
Bases:
Iterator
An iterator that lets you
peek()
at the next item on deck. Returns None when there is no next item (i.e. when__next__()
will produce a StopIteration exception).>>> p = PeekingIterator(iter(range(3))) >>> p.__next__() 0 >>> p.peek() 1 >>> p.peek() 1 >>> p.__next__() 1 >>> p.__next__() 2 >>> p.peek() == None True >>> p.__next__() Traceback (most recent call last): ... StopIteration
- Parameters:
source_iter (Iterator) – the iterator we want to peek at
- peek() Any | None [source]
Peek at the upcoming value on the top of our contained
Iterator
non-destructively (i.e. calling__next__()
will still produce the peeked value).- Returns:
The value that will be produced by the contained iterator next or None if the contained Iterator is exhausted and will raise StopIteration when read.
- Return type:
Any | None
- class pyutils.iter_utils.PushbackIterator(source_iter: Iterator)[source]
Bases:
Iterator
An iterator that allows you to push items back onto the front of the sequence so that they are produced before the items at the front/top of the contained py:class:Iterator. e.g.
>>> i = PushbackIterator(iter(range(3))) >>> i.__next__() 0 >>> i.push_back(99) >>> i.push_back(98) >>> i.__next__() 98 >>> i.__next__() 99 >>> i.__next__() 1 >>> i.__next__() 2 >>> i.push_back(100) >>> i.__next__() 100 >>> i.__next__() Traceback (most recent call last): ... StopIteration
- Parameters:
source_iter (Iterator) –
- class pyutils.iter_utils.SamplingIterator(source_iter: Iterator, sample_size: int)[source]
Bases:
Iterator
An
Iterator
that simply echoes what its source_iter produces but also collects a random sample (of size sample_size) from the stream that can be queried at any time.Note
Until sample_size elements have been produced by the source_iter, the sample return will be less than sample_size elements in length.
Note
If sample_size is >= len(source_iter) then this will produce a copy of source_iter.
>>> import collections >>> import random
>>> random.seed(22) >>> s = SamplingIterator(iter(range(100)), 10) >>> s.__next__() 0
>>> s.__next__() 1
>>> s.get_sample() [0, 1]
>>> collections.deque(s) deque([2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99])
>>> s.get_sample() [78, 18, 47, 83, 93, 26, 25, 73, 94, 38]
- Parameters:
source_iter (Iterator) –
sample_size (int) –
- get_sample() List[Any] [source]
- Returns:
The current sample set populated randomly from the items returned by the contained
Iterator
so far.- Return type:
List[Any]
Note
Until sample_size elements have been produced by the source_iter, the sample return will be less than sample_size elements in length.
Note
If sample_size is >= len(source_iter) then this will produce a copy of source_iter.
pyutils.list_utils module
This module contains helper functions for dealing with Python lists.
- pyutils.list_utils.all_unique(lst: List[Any]) bool [source]
Inverted alias for
contains_duplicates()
.- Parameters:
lst (List[Any]) –
- Return type:
bool
- pyutils.list_utils.binary_search(lst: Sequence[Any], target: Any) Tuple[bool, int] [source]
Performs a binary search on lst (which must already be sorted).
- Parameters:
lst (Sequence[Any]) – the (already sorted!) list in which to search
target (Any) – the item value to be found
- Returns:
A Tuple composed of a bool which indicates whether the target was found and an int which indicates the index closest to target whether it was found or not.
- Return type:
Tuple[bool, int]
>>> a = [1, 4, 5, 6, 7, 9, 10, 11] >>> binary_search(a, 4) (True, 1)
>>> binary_search(a, 12) (False, 8)
>>> binary_search(a, 3) (False, 1)
>>> binary_search(a, 2) (False, 1)
>>> a.append(9) >>> binary_search(a, 4) Traceback (most recent call last): ... AssertionError
- pyutils.list_utils.contains_duplicates(lst: List[Any]) bool [source]
Does the list contain duplicate elements or not?
- Parameters:
lst (List[Any]) – the list to check for duplicates
- Returns:
True if the input lst contains duplicated items and False otherwise.
- Return type:
bool
>>> lst = [1, 2, 1, 3, 3, 4, 4, 5, 6, 1, 3, 4] >>> contains_duplicates(lst) True
>>> contains_duplicates(dedup_list(lst)) False
- pyutils.list_utils.dedup(lst: List[Any]) List[Any] [source]
Alias for
dedup_list()
.- Parameters:
lst (List[Any]) –
- Return type:
List[Any]
- pyutils.list_utils.dedup_list(lst: List[Any]) List[Any] [source]
Remove duplicates from the list.
- Parameters:
lst (List[Any]) – the list to de-duplicate
- Returns:
The de-duplicated input list. That is, the same list with all extra duplicate items removed. The list composed of the set of unique items from the input lst
- Return type:
List[Any]
>>> dedup_list([1, 2, 1, 3, 3, 4, 2, 3, 4, 5, 1]) [1, 2, 3, 4, 5]
- pyutils.list_utils.flatten(lst: List[Any]) List[Any] [source]
Flatten out a list. That is, for each item in list that contains a list, remove the nested list and replace it with its items.
- Parameters:
lst (List[Any]) – the list to flatten
- Returns:
The flattened list. See example.
- Return type:
List[Any]
>>> flatten([ 1, [2, 3, 4, [5], 6], 7, [8, [9]]]) [1, 2, 3, 4, 5, 6, 7, 8, 9]
- pyutils.list_utils.least_common(lst: List[Any], *, count: int = 1) Any [source]
Return the N least common item in the list.
- Parameters:
lst (List[Any]) – the list to find the least common item in
count (int) – the number of least common items to return
- Returns:
The least common item in lst
- Return type:
Any
Warning
In the case of ties, which least common item is returned is undefined.
>>> least_common([1, 1, 1, 2, 2, 3, 3, 3, 4]) 4
>>> least_common([1, 1, 1, 2, 2, 3, 3, 3, 4], count=2) [4, 2]
- pyutils.list_utils.most_common(lst: List[Any], *, count: int = 1) Any [source]
Return the N most common item in the list.
- Parameters:
lst (List[Any]) – the list to find the most common item in
count (int) – the number of most common items to return
- Returns:
The most common item in lst.
- Return type:
Any
Warning
In the case of ties for most common item, which most common item is returned is undefined.
>>> most_common([1, 1, 1, 2, 2, 3, 3, 3, 3, 4, 4]) 3
>>> most_common([1, 1, 1, 2, 2, 3, 3, 3, 3, 4, 4], count=2) [3, 1]
- pyutils.list_utils.ngrams(lst: Sequence[T], n: int) Generator[Sequence[T], T, None] [source]
Return the ngrams in the sequence.
- Parameters:
lst (Sequence[T]) – the list in which to find ngrams
n (int) – the size of each ngram to return
- Returns:
A generator that yields all ngrams of size n in lst.
- Return type:
Generator[Sequence[T], T, None]
>>> seq = 'encyclopedia' >>> for _ in ngrams(seq, 3): ... _ 'enc' 'ncy' 'cyc' 'ycl' 'clo' 'lop' 'ope' 'ped' 'edi' 'dia'
>>> seq = ['this', 'is', 'an', 'awesome', 'test'] >>> for _ in ngrams(seq, 3): ... _ ['this', 'is', 'an'] ['is', 'an', 'awesome'] ['an', 'awesome', 'test']
- pyutils.list_utils.permute(seq: str) Generator[str, str, None] [source]
Returns all permutations of a sequence.
- Parameters:
seq (str) – the sequence to permute
- Returns:
All permutations creatable by shuffling items in seq.
- Return type:
Generator[str, str, None]
Warning
Takes O(N!) time, beware of large inputs.
>>> for x in permute('cat'): ... print(x) cat cta act atc tca tac
- pyutils.list_utils.population_counts(lst: Sequence[Any]) Counter [source]
Return a population count mapping for the list (i.e. the keys are list items and the values are the number of occurrances of that list item in the original list). Note: this is used internally to implement
most_common()
andleast_common()
.- Parameters:
lst (Sequence[Any]) – the list whose population should be counted
- Returns:
a Counter containing the population count of lst items.
- Return type:
Counter
>>> population_counts([1, 1, 1, 2, 2, 3, 3, 3, 4]) Counter({1: 3, 3: 3, 2: 2, 4: 1})
- pyutils.list_utils.powerset(seq: Sequence[Any]) Iterator[Sequence[Any]] [source]
Returns the powerset of the items in the input sequence. That is, return the set containing every set constructable using items from seq (including the empty set and the “full” set: seq itself).
- Parameters:
seq (Sequence[Any]) – the sequence whose items will be used to construct the powerset.
- Returns:
The powerset composed of all sets possible to create with items from seq. See: https://en.wikipedia.org/wiki/Power_set.
- Return type:
Iterator[Sequence[Any]]
>>> for x in powerset([1, 2, 3]): ... print(x) () (1,) (2,) (3,) (1, 2) (1, 3) (2, 3) (1, 2, 3)
- pyutils.list_utils.prepend(item: Any, lst: List[Any]) List[Any] [source]
Prepend an item to a list. An alias for list.insert(0, item). The opposite of list.append().
- Parameters:
item (Any) – the item to be prepended
lst (List[Any]) – the list on which to prepend
- Returns:
The list with item prepended.
- Return type:
List[Any]
>>> prepend('foo', ['bar', 'baz']) ['foo', 'bar', 'baz']
- pyutils.list_utils.remove_list_if_one_element(lst: List[Any]) Any [source]
Remove the list and return the 0th element iff its length is one.
- Parameters:
lst (List[Any]) – the List to check
- Returns:
Either lst (if len(lst) > 1) or lst[0] (if len(lst) == 1).
- Return type:
Any
>>> remove_list_if_one_element([1234]) 1234
>>> remove_list_if_one_element([1, 2, 3, 4]) [1, 2, 3, 4]
- pyutils.list_utils.scramble(seq: MutableSequence[Any]) MutableSequence[Any] [source]
An alias for
shuffle()
.- Parameters:
seq (MutableSequence[Any]) –
- Return type:
MutableSequence[Any]
- pyutils.list_utils.shard(lst: List[Any], size: int) Iterator[Any] [source]
Shards (i.e. splits) a list into sublists of size size whcih, together, contain all items in the original unsharded list.
- Parameters:
lst (List[Any]) – the original input list to shard
size (int) – the ideal shard size (number of elements per shard)
- Returns:
A generator that yields successive shards.
- Return type:
Iterator[Any]
Note
If len(lst) is not an even multiple of size then the last shard will not have size items in it. It will have len(lst) % size items instead.
>>> for sublist in shard([1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12], 3): ... [_ for _ in sublist] [1, 2, 3] [4, 5, 6] [7, 8, 9] [10, 11, 12]
- pyutils.list_utils.shuffle(seq: MutableSequence[Any]) MutableSequence[Any] [source]
Shuffles a sequence into a random order.
- Parameters:
seq (MutableSequence[Any]) – a sequence to shuffle
- Returns:
The shuffled sequence.
- Return type:
MutableSequence[Any]
>>> random.seed(22) >>> shuffle([1, 2, 3, 4, 5]) [3, 4, 1, 5, 2]
>>> shuffle('example') 'empaelx'
- pyutils.list_utils.transpose(lst: List[Any]) List[Any] [source]
Transpose a list of lists.
- Parameters:
lst (List[Any]) – the list of lists to be transposed.
- Returns:
The transposed result. See example.
- Return type:
List[Any]
>>> lst = [[1, 2], [3, 4], [5, 6]] >>> transpose(lst) [[1, 3, 5], [2, 4, 6]]
- pyutils.list_utils.uniq(lst: List[Any]) List[Any] [source]
Alias for
dedup_list()
.- Parameters:
lst (List[Any]) –
- Return type:
List[Any]
pyutils.logging_utils module
This is a module that offers an opinionated take on how whole program
logging should be initialized and controlled. It uses the standard
Python logging
but gives you control, via commandline config,
to do things such as:
Set the logging default level (debug, info, warning, error, critical) of the whole program (see:
--logging_level
)… and to override the logging level withLoggingContext
.Prepend or append a message to every log record also with
LoggingContext
.define the logging message format (see
--logging_format
and--logging_date_format
) including easily adding a PID/TID marker on all messages to help with multithreaded debugging (--logging_debug_threads
) and force module names of code that emits log messages to be included in the format (--logging_debug_modules
),control the destination of logged messages:
log to the console/stderr (
--logging_console
) and/orlog to a rotated file (
--logging_filename
,--logging_filename_maxsize
and--logging_filename_count
) and/orlog to the UNIX syslog (
--logging_syslog
and--logging_syslog_facility
)optionally squelch repeated messages (
--logging_squelch_repeats
),optionally log probalistically (
--logging_probabilistically
),capture printed messages into the info log (
--logging_captures_prints
),optionally clear unwanted logging handlers added by other imports before this one (
--logging_clear_preexisting_handlers
).optionally append to system-wide records of non-zero exits (
--logging_non_zero_exits_record_path
) and unhandled exceptions (--logging_unhandled_top_level_exceptions_record_path
) in cooperation withpyutils.bootstrap
.There are also
LoggerAdapter
classes to implement prefix/suffix functionality without usingLoggingContext
by wrapping the logger included.
To use this functionality, call initialize_logging()
early
in your program entry point. If you use the
pyutils.bootstrap.initialize()
decorator on your program’s entry
point, it will call this for you automatically.
- class pyutils.logging_utils.AppendingLogAdapter(logger, extra=None)[source]
Bases:
LoggerAdapter
LoggingContext
adds prefixes and suffixes using a logging.Filter that must insert “prefix” or “suffix” members into each log record by usingPrefixAddingFilter
andSuffixAddingFilter
. This relies on the logging format string containing a %(prefix)s and a %(suffix)s to work correctly.This is an alternate approach that basically just wraps the logger in a class that has the same interface and thunks most calls down to the wrapped logger. It might be useful if you don’t want to use
LoggingContext
or its friends.>>> logger = logging.getLogger(__name__ + ".AppendingLogAdapter") >>> logger.setLevel(logging.INFO) >>> logger.addHandler(logging.StreamHandler(sys.stdout))
At this point logger doesn’t have any format string and so it is missing %(prefix)s and %(suffix)s. It also doesn’t have a
PrefixAddingFilter
orSuffixAddingFilter
added. So using it in aLoggingContext
will not work.But we can still add a prefix or suffix by just wrapping it:
>>> logger.info("TEST") TEST
>>> log = AppendingLogAdapter.wrap_logger('!!!', logger) >>> log.info("TEST") TEST!!!
Initialize the adapter with a logger and a dict-like object which provides contextual information. This constructor signature allows easy stacking of LoggerAdapters, if so desired.
You can effectively pass keyword arguments as shown in the following example:
adapter = LoggerAdapter(someLogger, dict(p1=v1, p2=”v2”))
- process(msg, kwargs)[source]
Process the logging message and keyword arguments passed in to a logging call to insert contextual information. You can either manipulate the message itself, the keyword args or both. Return the message and kwargs modified (or not) to suit your needs.
Normally, you’ll only need to override this one method in a LoggerAdapter subclass for your specific needs.
- static wrap_logger(suffix: str, logger: Logger) LoggerAdapter [source]
Helper method around the creation of a LoggerAdapter that appends a given string to every log message produced.
- Parameters:
suffix (str) – the message to prepend to every log message.
logger (Logger) – the logger whose messages to modify.
- Returns:
A new logger wrapping the old one with the given behavior. The old logger will continue to behave as usual; simply drop the reference to this wrapper when it’s no longer needed.
- Return type:
LoggerAdapter
- class pyutils.logging_utils.LoggingContext(logger: Logger, *, level: int | None = None, handlers: List[Handler] | None = None, prefix: str | None = None, suffix: str | None = None)[source]
Bases:
ContextDecorator
This is a logging context that can be used to temporarily change the way we are logging within its scope. Logging changes may include:
Changing the logging level (e.g. from INFO to DEBUG)
Adding a prefix or suffix to every log message produced
Adding temporary Handlers to direct the logging output elsewhere
Setup for doctest / examples. This will normally be taken care of by code in
initialize_logging()
so you don’t have to worry about it.>>> logging_format = "%(prefix)s%(message)s%(suffix)s" >>> logger = logging.getLogger(__name__ + ".LoggingContext") >>> logger.setLevel(logging.INFO) >>> handler = logging.StreamHandler(sys.stdout) >>> handler.setFormatter( ... MillisecondAwareFormatter( ... fmt=logging_format, ... datefmt='', ... ) ... ) >>> logger.addHandler(handler) >>> logger.addFilter(PrefixAddingFilter(None)) >>> logger.addFilter(SuffixAddingFilter(None))
First, this logger should be currently be configured to send INFO+ messages to sys.stdout. Let’s see it in action:
>>> logger.info("Hello world!") Hello world! >>> logger.debug("You should not see this")
The first example is to simply change the level of the logger. Here we temporarily change it to DEBUG within the body of the
LoggingContext
:>>> with LoggingContext(logger, level=logging.DEBUG): ... logger.debug("You should see this now") ... logger.info("Of course you should still see this too") You should see this now Of course you should still see this too
>>> logger.debug("Outside of the context we are at INFO again") >>> logger.debug("(which is why you don't see these)") >>> logger.info("But you should see this at INFO level") But you should see this at INFO level
The prefix and suffix argument prepend or append a message to all log output. To do this, you need %(prefix)s and %(suffix)s placeholders in your logger format string indicating where to insert the data. This is useful, for example, to add an active request identifier to the set of log messages produced while processing it.
>>> logger.info("About to work on a new request") About to work on a new request
>>> with LoggingContext(logger, prefix='10.0.0.13> '): ... logger.info("Working on it now") 10.0.0.13> Working on it now
>>> logger.info("Done with that request") Done with that request
LoggingContext can also be used to add temporary handler(s). This code temporarily uses two stdout handlers to double the output for testing purporses but you could also temporarily, e.g., add a RotatingFileHandler or SysLogHandler etc…
>>> with LoggingContext(logger, handlers=[logging.StreamHandler(sys.stdout)]): ... logger.info("TEST") TEST TEST
Once leaving the context, logger’s behavior is restored. In this case, the extra handler is removed so output will not longer be doubled.
>>> logger.info("OUTSIDE") OUTSIDE
LoggingContext can also be used as a decorator if that is more convenient:
>>> @LoggingContext(logger, level=logging.DEBUG) ... def log_stuff(logger): ... logger.debug("But inside, the decorator has changed us to DEBUG")
>>> logger.debug("Outside, we're at INFO level and you don't see this") >>> log_stuff(logger) But inside, the decorator has changed us to DEBUG >>> logger.debug("And, of course, out here we're still at INFO afterwards")
- Parameters:
logger (Logger) – the logger on which to operate
level (int | None) – the new level to set for the duration of the context
handlers (List[Handler] | None) – additional handlers to add for the duration of the context
prefix (str | None) – the prefix string to set for the duration of the context
suffix (str | None) – the suffix string to set for the duration of the context
- Returns:
The modified logger.
- class pyutils.logging_utils.MillisecondAwareFormatter(fmt=None, datefmt=None, style='%', validate=True, *, defaults=None)[source]
Bases:
Formatter
A formatter for adding milliseconds to log messages which, for whatever reason, the default Python logger doesn’t do.
Note
You probably don’t need to use this directly but it is wired in under
initialize_logging()
so that the timestamps in log messages have millisecond level precision.Initialize the formatter with specified format strings.
Initialize the formatter either with the specified format string, or a default as described above. Allow for specialized date formatting with the optional datefmt argument. If datefmt is omitted, you get an ISO8601-like (or RFC 3339-like) format.
Use a style parameter of ‘%’, ‘{’ or ‘$’ to specify that you want to use one of %-formatting,
str.format()
({}
) formatting orstring.Template
formatting in your format string.Changed in version 3.2: Added the
style
parameter.- converter()
timestamp[, tz] -> tz’s local time from POSIX timestamp.
- formatTime(record, datefmt=None)[source]
Return the creation time of the specified LogRecord as formatted text.
This method should be called from format() by a formatter which wants to make use of a formatted time. This method can be overridden in formatters to provide for any specific requirement, but the basic behaviour is as follows: if datefmt (a string) is specified, it is used with time.strftime() to format the creation time of the record. Otherwise, an ISO8601-like (or RFC 3339-like) format is used. The resulting string is returned. This function uses a user-configurable function to convert the creation time to a tuple. By default, time.localtime() is used; to change this for a particular formatter instance, set the ‘converter’ attribute to a function with the same signature as time.localtime() or time.gmtime(). To change it for all formatters, for example if you want all logging times to be shown in GMT, set the ‘converter’ attribute in the Formatter class.
- class pyutils.logging_utils.OnlyInfoFilter(name='')[source]
Bases:
Filter
A filter that only logs messages produced at the INFO logging level. This is used by the
--logging_info_is_print
commandline option to select a subset of the logging stream to send to a stdout handler.Initialize a filter.
Initialize with the name of the logger which, together with its children, will have its events allowed through the filter. If no name is specified, allow every event.
- class pyutils.logging_utils.OutputMultiplexer(destination_bitv: int, *, logger=None, filenames: Iterable[str] | None = None, handles: Iterable[TextIOWrapper] | None = None)[source]
Bases:
object
A class that broadcasts printed messages to several sinks (including various logging levels, different files, different file handles, the house log, etc…). See also
OutputMultiplexerContext
for an easy usage pattern.Constructs the OutputMultiplexer instance.
- Parameters:
destination_bitv (int) – a bitvector where each bit represents an output destination. Multiple bits may be set.
logger – if LOG_* bits are set, you must pass a logger here.
filenames (Iterable[str] | None) – if FILENAMES bit is set, this should be a list of files you’d like to output into. This code handles opening and closing said files.
handles (Iterable[TextIOWrapper] | None) – if FILEHANDLES bit is set, this should be a list of already opened filehandles you’d like to output into. The handles will remain open after the scope of the multiplexer.
- Raises:
ValueError – invalid combination of arguments (e.g. the filenames argument is present but the filenames bit isn’t set, the handle argument is present but the handles bit isn’t set, etc…)
- class Destination(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
IntEnum
Bits in the destination_bitv bitvector. Used to indicate the output destination.
- ALL_LOG_DESTINATIONS = 31
- ALL_OUTPUT_DESTINATIONS = 143
- FILEHANDLES = 64
- FILENAMES = 32
- HLOG = 128
- LOG_CRITICAL = 16
- LOG_DEBUG = 1
- LOG_ERROR = 8
- LOG_INFO = 2
- LOG_WARNING = 4
- print(*args, **kwargs)[source]
Produce some output to all sinks. Use the same arguments as the print-builtin.
- Raises:
TypeError – Illegal argument types encountered
- set_destination_bitv(destination_bitv: int)[source]
Change the output destination_bitv to the one provided.
- Parameters:
destination_bitv (int) – the new destination bitvector to set.
- Raises:
ValueError – invalid combination of arguments (e.g. the filenames argument is present but the filenames bit isn’t set, the handle argument is present but the handles bit isn’t set, etc…)
- class pyutils.logging_utils.OutputMultiplexerContext(destination_bitv: Destination, *, logger=None, filenames=None, handles=None)[source]
Bases:
OutputMultiplexer
,ContextDecorator
A context that uses an
OutputMultiplexer
. e.g.:with OutputMultiplexerContext( OutputMultiplexer.LOG_INFO | OutputMultiplexer.LOG_DEBUG | OutputMultiplexer.FILENAMES | OutputMultiplexer.FILEHANDLES, filenames = [ '/tmp/foo.log', '/var/log/bar.log' ], handles = [ f, g ] ) as mplex: mplex.print("This is a log message!")
- Parameters:
destination_bitv (Destination) – a bitvector that indicates where we should send output. See
OutputMultiplexer
for options.logger – optional logger to use for log destination messages.
filenames – optional filenames to write for filename destination messages.
handles – optional open filehandles to write for filehandle destination messages.
- class pyutils.logging_utils.PrefixAddingFilter(prefix: str, klobber: bool = False)[source]
Bases:
Filter
A filter that adds a string prefix to the log record for the formatter to later fill in. Requires a %(prefix)s in the format string.
- Parameters:
prefix (str) – the prefix string to add
klobber (bool) – should we overwrite other prefixes?
- class pyutils.logging_utils.PrependingLogAdapter(logger, extra=None)[source]
Bases:
LoggerAdapter
LoggingContext
adds prefixes and suffixes using a logging.Filter that must insert “prefix” or “suffix” members into each log record by usingPrefixAddingFilter
andSuffixAddingFilter
. This relies on the logging format string containing a %(prefix)s and a %(suffix)s to work correctly.This is an alternate approach that basically just wraps the logger in a class that has the same interface and thunks most calls down to the wrapped logger. It might be useful if you don’t want to use
LoggingContext
or its friends.>>> logger = logging.getLogger(__name__ + ".PrependingLogAdapter") >>> logger.setLevel(logging.INFO) >>> logger.addHandler(logging.StreamHandler(sys.stdout))
At this point logger doesn’t have any format string and so it is missing %(prefix)s and %(suffix)s. It also doesn’t have a
PrefixAddingFilter
orSuffixAddingFilter
added. So using it in aLoggingContext
will not work.But we can still add a prefix or suffix by just wrapping it:
>>> logger.info("TEST") TEST
>>> log = PrependingLogAdapter.wrap_logger('prefix> ', logger) >>> log.info("TEST") prefix> TEST
Initialize the adapter with a logger and a dict-like object which provides contextual information. This constructor signature allows easy stacking of LoggerAdapters, if so desired.
You can effectively pass keyword arguments as shown in the following example:
adapter = LoggerAdapter(someLogger, dict(p1=v1, p2=”v2”))
- process(msg, kwargs)[source]
Process the logging message and keyword arguments passed in to a logging call to insert contextual information. You can either manipulate the message itself, the keyword args or both. Return the message and kwargs modified (or not) to suit your needs.
Normally, you’ll only need to override this one method in a LoggerAdapter subclass for your specific needs.
- static wrap_logger(prefix: str, logger: Logger) LoggerAdapter [source]
Helper method around the creation of a LogAdapter that prepends a given string to every log message produced.
- Parameters:
prefix (str) – the message to prepend to every log message.
logger (Logger) – the logger whose messages to modify.
- Returns:
A new logger wrapping the old one with the given behavior. The old logger will continue to behave as usual; simply drop the reference to this wrapper when it’s no longer needed.
- Return type:
LoggerAdapter
- class pyutils.logging_utils.ProbabilisticFilter(name='')[source]
Bases:
Filter
A filter that logs messages probabilistically (i.e. randomly at some percent chance). This filter is used with a decorator (see
logging_is_probabilistic()
) to implement the--logging_probabilistically
commandline flag.This filter only affects logging messages from functions that have been tagged with the @logging_utils.probabilistic_logging decorator.
Initialize a filter.
Initialize with the name of the logger which, together with its children, will have its events allowed through the filter. If no name is specified, allow every event.
- class pyutils.logging_utils.SquelchRepeatedMessagesFilter[source]
Bases:
Filter
A filter that only logs messages from a given site with the same (exact) message at the same logging level N times and ignores subsequent attempts to log.
This filter only affects logging messages that repeat more than a threshold number of times from functions that are tagged with the
@logging_utils.squelched_logging_ok
decorator (see above); all others are ignored.This functionality is enabled by default but can be disabled via the
--no_logging_squelch_repeats
commandline flag.Initialize a filter.
Initialize with the name of the logger which, together with its children, will have its events allowed through the filter. If no name is specified, allow every event.
- class pyutils.logging_utils.SuffixAddingFilter(suffix: str, klobber: bool = False)[source]
Bases:
Filter
A filter that adds a string suffix to the log record for the formatter to later fill in. Requires a %(suffix)s in the format string.
- Parameters:
suffix (str) – the suffix string to add
klobber (bool) – should we overwrite other suffixes?
- pyutils.logging_utils.get_logger(name: str = '')[source]
Get the global logger
- Parameters:
name (str) –
- pyutils.logging_utils.hlog(message: str) None [source]
Write a message to the house log (syslog facility local7 priority info) by calling /usr/bin/logger. This is pretty hacky but used by a bunch of (my) code. Another way to do this would be to use
--logging_syslog
and--logging_syslog_facility
but I can’t actually say that’s easier.TODO: this needs to move.
- Parameters:
message (str) –
- Return type:
None
- pyutils.logging_utils.initialize_logging(logger=None) Logger [source]
Initialize logging for the program. See module level comments for information about what functionality this provides and how to enable or disable functionality via the commandline.
If you use the
bootstrap.initialize()
decorator on your program’s entry point, it will call this for you. Seepyutils.bootstrap.initialize()
for more details.- Raises:
ValueError – if logging level is invalid
- Return type:
Logger
- pyutils.logging_utils.logging_is_probabilistic(probability_of_logging: float) Callable [source]
A decorator that indicates that all logging statements within the scope of a particular (marked via decorator) function are not deterministic (i.e. they do not always unconditionally log) but rather are probabilistic (i.e. they log N% of the time, randomly) when the user passes the
--logging_probabilistically
commandline flag (which is enabled by default).Note
This affects ALL logging statements within the marked function. If you want it to only affect a subset of logging statements, log those statements in a separate function that you invoke from within the “too large” scope and mark that separate function with the
logging_is_probabilistic
decorator instead.That this functionality can be disabled (forcing all logged messages to produce output) via the
--no_logging_probabilistically
cmdline argument.- Parameters:
probability_of_logging (float) –
- Return type:
Callable
- pyutils.logging_utils.non_zero_return_value(ret: Any) bool [source]
Special method hooked from bootstrap.py to optionally keep a system-wide record of non-zero python program exits.
- Parameters:
ret (Any) – the return value
- Return type:
bool
- pyutils.logging_utils.squelch_repeated_log_messages(squelch_after_n_repeats: int) Callable [source]
A decorator that marks a function as interested in having the logging messages that it produces be squelched (ignored) after it logs the same message more than N times.
Note
This decorator affects ALL logging messages produced within the decorated function. That said, messages must be identical in order to be squelched. For example, if the same line of code produces different messages (because of, e.g., a format string), the messages are considered to be different.
An example of this from the pyutils code itself can be found in
pyutils.ansi.fg()
andpyutils.ansi.bg()
methods:@logging_utils.squelch_repeated_log_messages(1) def fg( name: Optional[str] = "", red: Optional[int] = None, green: Optional[int] = None, blue: Optional[int] = None, *, force_16color: bool = False, force_216color: bool = False, ) -> str: ...
These methods log stuff like “Using 24-bit color strategy” which gets old really fast and fills up the logs. By decorating the methods with
@logging_utils.squelch_repeated_log_messages(1)
the code is requesting that its logged messages be dropped silently after the first one is produced (note the argument 1).Users can insist that all logged messages always be reflected in the logs using the
--no_logging_squelch_repeats
flag but the default behavior is to allow code to request it be squelched.--logging_squelch_repeats
only affects code with this decorator on it; it ignores all other code.- Parameters:
squelch_after_n_repeats (int) – the number of repeated messages allowed to log before subsequent messages are silently dropped.
- Return type:
Callable
- pyutils.logging_utils.unhandled_top_level_exception(exc_type: type, exc_value: type, exc_tb: TracebackType) bool [source]
Special method hooked from bootstrap.py to optionally keep a system-wide record of unhandled top level exceptions.
- Parameters:
exc_type (type) – the type of the unhandled exception
exc_value (type) – the value passed to the exception’s c’tor
exc_tb (TracebackType) – the stack from where the exception was raised
- Return type:
bool
pyutils.math_utils module
Helper utilities with a mathematical / statictical focus.
- class pyutils.math_utils.NumericPopulation[source]
Bases:
object
This object store a numeric population in a way that enables relatively fast addition of new numbers (\(O(2log_2 n)\)) and instant access to the median value in the population (\(O(1)\)). It also provides other population summary statistics such as the
get_mode()
,get_percentile()
andget_stdev()
.Note
Because this class stores a copy of all numbers added to it, it shouldn’t be used for very large populations. Consider sampling.
>>> pop = NumericPopulation() >>> pop.add_number(1) >>> pop.add_number(10) >>> pop.add_number(3) >>> len(pop) 3 >>> pop.get_median() 3 >>> pop.add_number(7) >>> pop.add_number(5) >>> pop.get_median() 5 >>> pop.get_mean() 5.2 >>> round(pop.get_stdev(), 1) 3.1 >>> pop.get_percentile(20) 3 >>> pop.get_percentile(60) 7
- add_number(number: int | float)[source]
Adds a number to the population. Runtime complexity of this operation is \(O(2 log_2 n)\)
- Parameters:
number (int | float) – the number to add_number to the population
- get_mean() float [source]
- Returns:
The mean (arithmetic mean) so far in \(O(1)\) time.
- Return type:
float
- get_median() int | float [source]
- Returns:
The median (p50) of the current population in \(O(1)\) time.
- Return type:
int | float
- get_mode() Tuple[int | float, int] [source]
- Returns:
The population mode (most common member in the population) in \(O(n)\) time.
- Return type:
Tuple[int | float, int]
- get_percentile(n: float) int | float [source]
Returns: the number at approximately pn% in the population (i.e. the nth percentile) in \(O(n log_2 n)\) time (it performs a full sort). This is not the most efficient algorithm.
Not thread-safe; does caching across multiple calls without an invocation to
add_number()
for perf reasons.- Parameters:
n (float) – the percentile to compute
- Return type:
int | float
- pyutils.math_utils.gcd_float_sequence(lst: List[float]) float [source]
- Returns:
The greatest common divisor of a list of floats.
- Parameters:
lst (List[float]) – a list of operands
- Raises:
ValueError – if the list doesn’t contain at least one number.
- Return type:
float
- pyutils.math_utils.gcd_floats(a: float, b: float) float [source]
- Returns:
The greatest common divisor of a and b.
- Parameters:
a (float) – first operand
b (float) – second operatnd
- Return type:
float
- pyutils.math_utils.is_prime(n: int) bool [source]
- Parameters:
n (int) – the number for which primeness is to be determined.
- Returns:
True if n is prime and False otherwise.
- Raises:
TypeError – if argument is not an into
- Return type:
bool
Note
Obviously(?) very slow for very large input numbers until we get quantum computers.
>>> is_prime(13) True >>> is_prime(22) False >>> is_prime(51602981) True
- pyutils.math_utils.multiplier_to_percent(multiplier: float) float [source]
Convert a multiplicative factor into a percent change or return percentage.
- Parameters:
multiplier (float) – the multiplier for which to compute the percent change
- Return type:
float
>>> multiplier_to_percent(0.75) -25.0 >>> multiplier_to_percent(1.0) 0.0 >>> multiplier_to_percent(1.99) 99.0
- pyutils.math_utils.percentage_to_multiplier(percent: float) float [source]
Given a percentage that represents a return or percent change (e.g. 155%), determine the factor (i.e. multiplier) needed to scale a number by that percentage (e.g. 2.55x)
- Parameters:
percent (float) – the return percent to scale by
- Return type:
float
>>> percentage_to_multiplier(155) 2.55 >>> percentage_to_multiplier(45) 1.45 >>> percentage_to_multiplier(-25) 0.75
pyutils.misc_utils module
Miscellaneous utilities.
- pyutils.misc_utils.debugger_is_attached() bool [source]
- Returns:
True if a debugger is attached, False otherwise.
- Return type:
bool
- pyutils.misc_utils.execute_probabilistically(probability_to_execute: float) bool [source]
- Parameters:
probability_to_execute (float) – the probability of returning True.
- Returns:
True with a given probability.
- Return type:
bool
>>> random.seed(22) >>> execute_probabilistically(50.0) False >>> execute_probabilistically(50.0) True
pyutils.remote_worker module
pyutils.state_tracker module
This module defines several classes (StateTracker
,
AutomaticStateTracker
, and
WaitableAutomaticStateTracker
) that can be used as base
classes by your code. These class patterns are meant to encapsulate
and represent some state that dynamically changes and must be updated
periodically. These classes update their state (either automatically
or when invoked to poll) and allow their callers to wait on state
changes.
See also pyutils.parallelize.thread_utils.periodically_invoke
- class pyutils.state_tracker.AutomaticStateTracker(update_ids_to_update_secs: Dict[str, float], *, override_sleep_delay: float | None = None)[source]
Bases:
StateTracker
Just like
StateTracker
but you don’t need to pump theheartbeat()
method periodically because we create a background thread that manages periodic calling. You must callshutdown()
, though, in order to terminate the update thread.Construct an AutomaticStateTracker.
- Parameters:
update_ids_to_update_secs (Dict[str, float]) –
a dict mapping a user-defined update_id into a period (number of seconds) with which we would like this update performed. e.g.:
update_ids_to_update_secs = { 'refresh_local_state': 10.0, 'refresh_remote_state': 60.0, }
This would indicate that every 10s we would like to refresh local state whereas every 60s we’d like to refresh remote state.
override_sleep_delay (float | None) – By default, this class determines how long the background thread should sleep between automatic invocations to
heartbeat()
based on the period of each update type in update_ids_to_update_secs. If this argument is non-None, it overrides this computation and uses this period as the sleep in the background thread.
- class pyutils.state_tracker.StateTracker(update_ids_to_update_secs: Dict[str, float])[source]
Bases:
ABC
A base class that maintains and updates its state via an update routine called
heartbeat()
. This method is not automatic: instances of this class should be periodically invoked via theirheartbeat()
method by some other thread.See also
AutomaticStateTracker
if you’d rather not have to invoke your code regularly.The update_ids_to_update_secs dict parameter describes one or more update types (unique update_ids) and the periodicity(ies), in seconds, at which it/they should be invoked.
Note
When more than one update is overdue, they will be invoked in order by their update_ids so care in choosing these identifiers may be in order.
- Parameters:
update_ids_to_update_secs (Dict[str, float]) –
a dict mapping a user-defined update_id into a period (number of seconds) with which we would like this update performed. e.g.:
update_ids_to_update_secs = { 'refresh_local_state': 10.0, 'refresh_remote_state': 60.0, }
This would indicate that every 10s we would like to refresh local state whereas every 60s we’d like to refresh remote state.
- heartbeat(*, force_all_updates_to_run: bool = False) None [source]
Invoke this method periodically to cause the
StateTracker
instance to identify and invoke any overdue updates based on the schedule passed to the c’tor. In the baseStateTracker
class, this method must be invoked manually by a thread from external code. Other subclasses (e.g.AutomaticStateTracker
) are available that create their own updater threads (see below).If more than one type of update (update_id) is overdue, overdue updates will be invoked in order based on their update_id.
Setting force_all_updates_to_run will invoke all updates (ordered by update_id) immediately ignoring whether or not they are due.
- Parameters:
force_all_updates_to_run (bool) –
- Return type:
None
- abstract update(update_id: str, now: datetime, last_invocation: datetime | None) None [source]
Put whatever you want here to perform your state updates.
- Parameters:
update_id (str) – the string you passed to the c’tor as a key in the update_ids_to_update_secs dict.
update()
will only be invoked, at most, every update_secs seconds.now (datetime) – the approximate current timestamp at invocation time.
last_invocation (datetime | None) – the last time this operation was invoked (or None on the first invocation).
- Return type:
None
- class pyutils.state_tracker.WaitableAutomaticStateTracker(update_ids_to_update_secs: Dict[str, float], *, override_sleep_delay: float | None = None)[source]
Bases:
AutomaticStateTracker
This is an AutomaticStateTracker that exposes a wait method which will block the calling thread until the state changes with an optional timeout. The caller should check the return value of wait; it will be true if something changed and false if the wait simply timed out. If the return value is true, the instance should be reset() before wait is called again.
Example usage:
detector = waitable_presence.WaitableAutomaticStateSubclass() while True: changed = detector.wait(timeout=60) if changed: detector.reset() # Figure out what changed and react somehow else: # Just a timeout; no need to reset. Maybe do something # else before looping up into wait again.
Construct an WaitableAutomaticStateTracker.
- Parameters:
update_ids_to_update_secs (Dict[str, float]) –
a dict mapping a user-defined update_id into a period (number of seconds) with which we would like this update performed. e.g.:
update_ids_to_update_secs = { 'refresh_local_state': 10.0, 'refresh_remote_state': 60.0, }
This would indicate that every 10s we would like to refresh local state whereas every 60s we’d like to refresh remote state.
override_sleep_delay (float | None) – By default, this class determines how long the background thread should sleep between automatic invocations to
heartbeat()
based on the period of each update type in update_ids_to_update_secs. If this argument is non-None, it overrides this computation and uses this period as the sleep in the background thread.
pyutils.stopwatch module
This is a stopwatch context that just times how long something took to execute.
—
A simple stopwatch decorator / context for timing things. This was factored out of decorator utils so that bootstrap.py can keep its imports lighter.
pyutils.string_utils module
A bunch of utilities for dealing with strings. Based on a really great starting library from Davide Zanotti (forked from https://github.com/daveoncode/python-string-utils/tree/master/string_utils), I’ve added a pile of other string functions (see NOTICE file in the root of this project for a detailed account of what was added and changed) so hopefully it will handle all of your string-needs.
—
The MIT License (MIT)
Copyright (c) 2016-2020 Davide Zanotti
Modifications Copyright (c) 2021-2023 Scott Gasch
Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
This class is based on: https://github.com/daveoncode/python-string-utils. See NOTICE in the root of this module for a detailed enumeration of what work is Davide’s and what work was added by Scott.
- class pyutils.string_utils.SprintfStdout[source]
Bases:
AbstractContextManager
A context manager that captures outputs to stdout to a buffer without printing them.
>>> with SprintfStdout() as buf: ... print("test") ... print("1, 2, 3") ... >>> print(buf(), end='') test 1, 2, 3
- pyutils.string_utils.add_cardinal_suffix(n: int)[source]
- Parameters:
n (int) – the number to return as a string with a cardinal suffix.
- Returns:
A string containing the number with its cardinal suffix.
>>> add_cardinal_suffix(123) '123rd'
>>> add_cardinal_suffix(1) '1st'
>>> add_cardinal_suffix(0) '0th'
>>> add_cardinal_suffix(-123) '-123rd'
- pyutils.string_utils.add_thousands_separator(in_str: str, *, separator_char: str = ',', places: int = 3) str [source]
- Parameters:
in_str (str) – string or number to which to add thousands separator(s)
separator_char (str) – the separator character to add (defaults to comma)
places (int) – add a separator every N places (defaults to three)
- Returns:
A numeric string with thousands separators added appropriately.
- Raises:
ValueError – a non-numeric string argument is presented
- Return type:
str
>>> add_thousands_separator('12345678') '12,345,678' >>> add_thousands_separator(12345678) '12,345,678' >>> add_thousands_separator(12345678.99) '12,345,678.99' >>> add_thousands_separator('test') Traceback (most recent call last): ... ValueError: test
- pyutils.string_utils.asciify(in_str: str) str [source]
- Parameters:
in_str (str) – the string to asciify.
- Returns:
An output string roughly equivalent to the original string where all content to are ascii-only. This is accomplished by translating all non-ascii chars into their closest possible ASCII representation (eg: ó -> o, Ë -> E, ç -> c…).
- Raises:
TypeError – the input argument isn’t a string
- Return type:
str
See also
to_ascii()
,generate_random_alphanumeric_string()
.Warning
Some chars may be lost if impossible to translate.
>>> asciify('èéùúòóäåëýñÅÀÁÇÌÍÑÓË') 'eeuuooaaeynAAACIINOE'
- pyutils.string_utils.bigrams(txt: str) Generator[str, str, None] [source]
Generates the bigrams (n=2) of the given string.
See also
ngrams()
,trigrams()
.>>> [x for x in bigrams('this is a test')] ['this is', 'is a', 'a test']
- Parameters:
txt (str) –
- Return type:
Generator[str, str, None]
- pyutils.string_utils.camel_case_to_snake_case(in_str: str, *, separator: str = '_')[source]
- Parameters:
in_str (str) – the camel case string to convert
separator (str) – the snake case separator character to use
- Returns:
A snake case string equivalent to the camel case input or the original string if it is not a valid camel case string or some other error occurs.
- Raises:
TypeError – the input argument isn’t a string
See also
is_camel_case()
,is_snake_case()
, andis_slug()
.>>> camel_case_to_snake_case('MacAddressExtractorFactory') 'mac_address_extractor_factory' >>> camel_case_to_snake_case('Luke Skywalker') 'Luke Skywalker'
- pyutils.string_utils.capitalize_first_letter(in_str: str) str [source]
- Parameters:
in_str (str) – the string to capitalize
- Returns:
in_str with the first character capitalized.
- Return type:
str
>>> capitalize_first_letter('test') 'Test' >>> capitalize_first_letter("ALREADY!") 'ALREADY!'
- pyutils.string_utils.chunk(txt: str, chunk_size: int)[source]
- Parameters:
txt (str) – a string to be chunked into evenly spaced pieces.
chunk_size (int) – the size of each chunk to make
- Returns:
The original string chunked into evenly spaced pieces.
>>> ' '.join(chunk('010011011100010110101010101010101001111110101000', 8)) '01001101 11000101 10101010 10101010 10011111 10101000'
- pyutils.string_utils.contains_html(in_str: str) bool [source]
- Parameters:
in_str (str) – the string to check for tags in
- Returns:
True if the given string contains HTML/XML tags and False otherwise.
- Raises:
TypeError – the input argument isn’t a string
- Return type:
bool
See also
strip_html()
.Warning
By design, this function matches ANY type of tag, so don’t expect to use it as an HTML validator. It’s a quick sanity check at best. See something like BeautifulSoup for a more full-featuered HTML parser.
>>> contains_html('my string is <strong>bold</strong>') True >>> contains_html('my string is not bold') False
- pyutils.string_utils.dedent(in_str: str) str | None [source]
- Parameters:
in_str (str) – the string to dedent
- Returns:
A string with tab indentation removed or None on error.
- Return type:
str | None
See also
indent()
.>>> dedent(' test\n ing') 'test\ning'
- pyutils.string_utils.extract_date(in_str: Any) datetime | None [source]
Finds and extracts a date from the string, if possible.
- Parameters:
in_str (Any) – the string to extract a date from
- Returns:
a datetime if date was found, otherwise None
- Return type:
datetime | None
See also:
pyutils.datetimes.dateparse_utils
,to_date()
,is_valid_date()
,to_datetime()
,valid_datetime()
.>>> extract_date("filename.txt dec 13, 2022") datetime.datetime(2022, 12, 13, 0, 0)
>>> extract_date("Dear Santa, please get me a pony.")
- pyutils.string_utils.extract_ip(in_str: Any) str | None [source]
- Parameters:
in_str (Any) – the string from which to extract in IP address.
- Returns:
The first IP address (IPv4 or IPv6) found in in_str or None to indicate none found or an error condition.
- Return type:
str | None
See also
is_ip_v4()
,is_ip_v6()
,extract_ip_v6()
, andextract_ip_v4()
.>>> extract_ip('Attacker: 255.200.100.75') '255.200.100.75' >>> extract_ip('Remote host: 2001:db8:85a3:0000:0000:8a2e:370:7334') '2001:db8:85a3:0000:0000:8a2e:370:7334' >>> extract_ip('1.2.3')
- pyutils.string_utils.extract_ip_v4(in_str: Any) str | None [source]
- Parameters:
in_str (Any) – the string to extract an IPv4 address from.
- Returns:
The first extracted IPv4 address from in_str or None if none were found or an error occurred.
- Return type:
str | None
See also
is_ip_v4()
,is_ip_v6()
,extract_ip_v6()
, andis_ip()
.>>> extract_ip_v4(' The secret IP address: 127.0.0.1 (use it wisely) ') '127.0.0.1' >>> extract_ip_v4('Your mom dresses you funny.')
- pyutils.string_utils.extract_ip_v6(in_str: Any) str | None [source]
- Parameters:
in_str (Any) – the string from which to extract an IPv6 address.
- Returns:
The first IPv6 address found in in_str or None if no address was found or an error occurred.
- Return type:
str | None
See also
is_ip_v4()
,is_ip_v6()
,extract_ip_v4()
, andis_ip()
.>>> extract_ip_v6('IP: 2001:db8:85a3:0000:0000:8a2e:370:7334') '2001:db8:85a3:0000:0000:8a2e:370:7334' >>> extract_ip_v6("(and she's ugly too, btw)")
- pyutils.string_utils.extract_mac_address(in_str: Any, *, separator: str = ':') str | None [source]
- Parameters:
in_str (Any) – the string from which to extract a MAC address.
separator (str) – the MAC address hex byte separator to use.
- Returns:
The first MAC address found in in_str or None to indicate no match or an error.
- Return type:
str | None
See also
is_mac_address()
,is_ip()
, andextract_ip()
.>>> extract_mac_address(' MAC Address: 34:29:8F:12:0D:2F') '34:29:8F:12:0D:2F'
>>> extract_mac_address('? (10.0.0.30) at d8:5d:e2:34:54:86 on em0 expires in 1176 seconds [ethernet]') 'd8:5d:e2:34:54:86'
- pyutils.string_utils.from_base64(b64: bytes, encoding: str = 'utf-8', errors: str = 'surrogatepass') str [source]
- Parameters:
b64 (bytes) – bytestring of 64-bit encoded data to decode / convert.
encoding (str) – the encoding to use during conversion
errors (str) – how to handle encoding errors
- Returns:
The decoded form of b64 as a normal python string. Similar to and compatible with uuencode / uudecode.
- Return type:
str
See also
to_base64()
,is_base64()
.>>> from_base64(b'aGVsbG8/\n') 'hello?'
- pyutils.string_utils.from_bitstring(bits: str, encoding: str = 'utf-8', errors: str = 'surrogatepass') str [source]
- Parameters:
bits (str) – the bitstring to convert back into a python string
encoding (str) – the encoding to use during conversion
errors (str) – how to handle encoding errors
- Returns:
The regular python string represented by bits. Note that this code does not work with to_bitstring when delimiter is non-empty.
- Return type:
str
See also
to_base64()
,to_bitstring()
,is_bitstring()
,chunk()
.>>> from_bitstring('011010000110010101101100011011000110111100111111') 'hello?'
- pyutils.string_utils.from_char_list(in_list: List[str]) str [source]
- Parameters:
in_list (List[str]) – A list of characters to convert into a string.
- Returns:
The string resulting from gluing the characters in in_list together.
- Return type:
str
See also
to_char_list()
.>>> from_char_list(['t', 'e', 's', 't']) 'test'
- pyutils.string_utils.generate_random_alphanumeric_string(size: int) str [source]
- Parameters:
size (int) – number of characters to generate
- Returns:
A string of the specified size containing random characters (uppercase/lowercase ascii letters and digits).
- Raises:
ValueError – size < 1
- Return type:
str
See also
asciify()
,generate_uuid()
.>>> random.seed(22) >>> generate_random_alphanumeric_string(9) '96ipbNClS'
- pyutils.string_utils.generate_uuid(omit_dashes: bool = False) str [source]
- Parameters:
omit_dashes (bool) – should we omit the dashes in the generated UUID?
- Returns:
A generated UUID string (using uuid.uuid4()) with or without dashes per the omit_dashes arg.
- Return type:
str
See also
is_uuid()
,generate_random_alphanumeric_string()
.generate_uuid() # possible output: ‘97e3a716-6b33-4ab9-9bb1-8128cb24d76b’ generate_uuid(omit_dashes=True) # possible output: ‘97e3a7166b334ab99bb18128cb24d76b’
- pyutils.string_utils.get_cardinal_suffix(n: int) str
- Parameters:
n (int) – how many of them are there?
- Returns:
The proper cardinal suffix for a number.
- Return type:
str
See also
it_they()
,is_are()
,make_contractions()
.Suggested usage:
attempt_count = 0 while True: attempt_count += 1 if try_the_thing(): break print(f'The {attempt_count}{thify(attempt_count)} failed, trying again.')
>>> thify(1) 'st' >>> thify(33) 'rd' >>> thify(16) 'th'
- pyutils.string_utils.indent(in_str: str, amount: int) str [source]
- Parameters:
in_str (str) – the string to indent
amount (int) – count of spaces to indent each line by
- Returns:
An indented string created by prepending amount spaces.
- Raises:
TypeError – the input argument isn’t a string
- Return type:
str
See also
dedent()
.>>> indent('This is a test', 4) ' This is a test'
- pyutils.string_utils.integer_to_number_string(num: int) str [source]
Opposite of
number_string_to_integer()
; converts a number to a written out longhand format in English.- Parameters:
num (int) – the integer number to convert
- Returns:
The long-hand written out English form of the number. See examples below.
- Return type:
str
See also
number_string_to_integer()
.Warning
This method does not handle decimals or floats, only ints.
>>> integer_to_number_string(9) 'nine'
>>> integer_to_number_string(42) 'forty two'
>>> integer_to_number_string(123219982) 'one hundred twenty three million two hundred nineteen thousand nine hundred eighty two'
- pyutils.string_utils.interpolate_using_dict(txt: str, values: Dict[str, str]) str [source]
Interpolate a string with data from a dict.
- Parameters:
txt (str) – the mad libs template
values (Dict[str, str]) – what you and your kids chose for each category.
- Return type:
str
See also
shuffle_columns_into_list()
,shuffle_columns_into_dict()
.>>> interpolate_using_dict('This is a {adjective} {noun}.', ... {'adjective': 'good', 'noun': 'example'}) 'This is a good example.'
- pyutils.string_utils.ip_v4_sort_key(txt: str) Tuple[int, ...] | None [source]
- Parameters:
txt (str) – an IP address to chunk up for sorting purposes
- Returns:
A tuple of IP components arranged such that the sorting of IP addresses using a normal comparator will do something sane and desireable.
- Return type:
Tuple[int, …] | None
See also
is_ip_v4()
.>>> ip_v4_sort_key('10.0.0.18') (10, 0, 0, 18)
>>> ips = ['10.0.0.10', '100.0.0.1', '1.2.3.4', '10.0.0.9'] >>> sorted(ips, key=lambda x: ip_v4_sort_key(x)) ['1.2.3.4', '10.0.0.9', '10.0.0.10', '100.0.0.1']
- pyutils.string_utils.is_are(n: int) str [source]
- Parameters:
n (int) – how many of them are there?
- Returns:
‘is’ if n is one or ‘are’ otherwize.
- Return type:
str
See also
it_they()
,pluralize()
,make_contractions()
,thify()
.Suggested usage:
n = num_files_saved_to_tmp() print(f'Saved file{pluralize(n)} successfully.') print(f'{it_they(n)} {is_are(n)} located in /tmp.')
>>> is_are(1) 'is' >>> is_are(2) 'are'
- pyutils.string_utils.is_base64(txt: str) bool [source]
- Parameters:
txt (str) – the string to check
- Returns:
True if txt is a valid base64 encoded string. This assumes txt was encoded with Python’s standard base64 alphabet which is the same as what uuencode/uudecode uses).
- Return type:
bool
See also
to_base64()
,from_base64()
.>>> is_base64('test') # all letters in the b64 alphabet True
>>> is_base64('another test, how do you like this one?') False
>>> is_base64(b'aGVsbG8/\n') # Ending newline is ok. True
- pyutils.string_utils.is_binary_integer_number(in_str: str) bool [source]
- Parameters:
in_str (str) – the string to test
- Returns:
True if the string contains a binary integral number and False otherwise.
- Raises:
TypeError – the input argument isn’t a string
- Return type:
bool
See also
is_integer_number()
,is_decimal_number()
,is_hexidecimal_integer_number()
,is_octal_integer_number()
, etc…>>> is_binary_integer_number('0b10111') True >>> is_binary_integer_number('-0b111') True >>> is_binary_integer_number('0B10101') True >>> is_binary_integer_number('0b10102') False >>> is_binary_integer_number('0xFFF') False >>> is_binary_integer_number('test') False
- pyutils.string_utils.is_bitstring(txt: str) bool [source]
- Parameters:
txt (str) – the string to check
- Returns:
True if txt is a recognized bitstring and False otherwise. Note that if delimiter is non empty this code will not recognize the bitstring.
- Return type:
bool
See also
to_base64()
,from_bitstring()
,to_bitstring()
,chunk()
.>>> is_bitstring('011010000110010101101100011011000110111100111111') True
>>> is_bitstring('1234') False
- pyutils.string_utils.is_camel_case(in_str: Any) bool [source]
- Parameters:
in_str (Any) – the string to test
- Returns:
True if the string is formatted as camel case and False otherwise. A string is considered camel case when:
it’s composed only by letters ([a-zA-Z]) and optionally numbers ([0-9])
it contains both lowercase and uppercase letters
it does not start with a number
- Return type:
bool
See also
is_snake_case()
,is_slug()
, andcamel_case_to_snake_case()
.
- pyutils.string_utils.is_credit_card(in_str: Any, card_type: str | None = None) bool [source]
- Parameters:
in_str (Any) – a string to check
card_type (str | None) –
if provided, contains the card type to validate with. Otherwise, all known credit card number types will be accepted.
Supported card types are the following:
VISA
MASTERCARD
AMERICAN_EXPRESS
DINERS_CLUB
DISCOVER
JCB
- Returns:
True if in_str is a valid credit card number.
- Raises:
KeyError – card_type is invalid
- Return type:
bool
Warning
This code is not verifying the authenticity of the credit card (i.e. not checking whether it’s a real card that can be charged); rather it’s only checking that the number follows the “rules” for numbering established by credit card issuers.
- pyutils.string_utils.is_decimal_number(in_str: str) bool [source]
- Parameters:
in_str (str) – the string to check
- Returns:
True if the given string represents a decimal or False otherwise. A decimal may be signed or unsigned or use a “scientific notation”.
- Return type:
bool
See also
is_integer_number()
.Note
We do not consider integers without a decimal point to be decimals; they return False (see example).
>>> is_decimal_number('42.0') True >>> is_decimal_number('42') False
- pyutils.string_utils.is_email(in_str: Any) bool [source]
- Parameters:
in_str (Any) – the email address to check
- Return type:
bool
- Returns: True if the in_str contains a valid email (as defined by
https://tools.ietf.org/html/rfc3696#section-3) or False otherwise.
>>> is_email('[email protected]') True >>> is_email('@gmail.com') False
- pyutils.string_utils.is_empty(in_str: Any) bool [source]
- Parameters:
in_str (Any) – the string to test
- Returns:
True if the string is empty and false otherwise.
- Return type:
bool
See also
is_none_or_empty()
,is_full_string()
.>>> is_empty('') True >>> is_empty(' ') True >>> is_empty('test') False >>> is_empty(100.88) False >>> is_empty([1, 2, 3]) False
- pyutils.string_utils.is_empty_string(in_str: Any) bool [source]
- Parameters:
in_str (Any) – the string to test
- Returns:
True if the string is empty and False otherwise.
- Return type:
bool
See also
is_none_or_empty()
,is_full_string()
.
- pyutils.string_utils.is_full_string(in_str: Any) bool [source]
- Parameters:
in_str (Any) – the object to test
- Returns:
True if the object is a string and is not empty (‘’) and is not only composed of whitespace.
- Return type:
bool
See also
is_string()
,is_empty_string()
,is_none_or_empty()
.>>> is_full_string('test!') True >>> is_full_string('') False >>> is_full_string(' ') False >>> is_full_string(100.999) False >>> is_full_string({"a": 1, "b": 2}) False
- pyutils.string_utils.is_hexidecimal_integer_number(in_str: str) bool [source]
- Parameters:
in_str (str) – the string to test
- Returns:
True if the string is a hex integer number and False otherwise.
- Raises:
TypeError – the input argument isn’t a string
- Return type:
bool
See also
is_integer_number()
,is_decimal_number()
,is_octal_integer_number()
,is_binary_integer_number()
, etc…>>> is_hexidecimal_integer_number('0x12345') True >>> is_hexidecimal_integer_number('0x1A3E') True >>> is_hexidecimal_integer_number('1234') # Needs 0x False >>> is_hexidecimal_integer_number('-0xff') True >>> is_hexidecimal_integer_number('test') False >>> is_hexidecimal_integer_number(12345) # Not a string Traceback (most recent call last): ... TypeError: 12345 >>> is_hexidecimal_integer_number(101.4) Traceback (most recent call last): ... TypeError: 101.4 >>> is_hexidecimal_integer_number(0x1A3E) Traceback (most recent call last): ... TypeError: 6718
- pyutils.string_utils.is_integer_number(in_str: str) bool [source]
- Parameters:
in_str (str) – the string to test
- Returns:
True if the string contains a valid (signed or unsigned, decimal, hex, or octal, regular or scientific) integral expression and False otherwise.
- Return type:
bool
See also
is_number()
,is_decimal_number()
,is_hexidecimal_integer_number()
,is_octal_integer_number()
, etc…>>> is_integer_number('42') True >>> is_integer_number('42.0') False
- pyutils.string_utils.is_ip(in_str: Any) bool [source]
- Parameters:
in_str (Any) – the string to test.
- Returns:
True if in_str contains a valid IP address (either IPv4 or IPv6).
- Return type:
bool
See also
is_ip_v4()
,is_ip_v6()
,extract_ip_v6()
, andextract_ip_v4()
.>>> is_ip('255.200.100.75') True >>> is_ip('2001:db8:85a3:0000:0000:8a2e:370:7334') True >>> is_ip('1.2.3') False >>> is_ip('1.2.3.999') False
- pyutils.string_utils.is_ip_v4(in_str: Any) bool [source]
- Parameters:
in_str (Any) – the string to test
- Returns:
True if in_str contains a valid IPv4 address and False otherwise.
- Return type:
bool
See also
extract_ip_v4()
,is_ip_v6()
,extract_ip_v6()
, andis_ip()
.>>> is_ip_v4('255.200.100.75') True >>> is_ip_v4('nope') False >>> is_ip_v4('255.200.100.999') # 999 out of range False
- pyutils.string_utils.is_ip_v6(in_str: Any) bool [source]
- Parameters:
in_str (Any) – the string to test.
- Returns:
True if in_str contains a valid IPv6 address and False otherwise.
- Return type:
bool
See also
is_ip_v4()
,extract_ip_v4()
,extract_ip_v6()
, andis_ip()
.>>> is_ip_v6('2001:db8:85a3:0000:0000:8a2e:370:7334') True >>> is_ip_v6('2001:db8:85a3:0000:0000:8a2e:370:?') # invalid "?" False
- pyutils.string_utils.is_json(in_str: Any) bool [source]
- Parameters:
in_str (Any) – the string to test
- Returns:
True if the in_str contains valid JSON and False otherwise.
- Return type:
bool
>>> is_json('{"name": "Peter"}') True >>> is_json('[1, 2, 3]') True >>> is_json('{nope}') False
- pyutils.string_utils.is_mac_address(in_str: Any) bool [source]
- Parameters:
in_str (Any) – the string to test
- Returns:
True if in_str is a valid MAC address False otherwise.
- Return type:
bool
See also
extract_mac_address()
,is_ip()
, etc…>>> is_mac_address("34:29:8F:12:0D:2F") True >>> is_mac_address('34:29:8f:12:0d:2f') True >>> is_mac_address('34-29-8F-12-0D-2F') True >>> is_mac_address("test") False
- pyutils.string_utils.is_none_or_empty(in_str: str | None) bool [source]
- Parameters:
in_str (str | None) – the string to test
- Returns:
True if the input string is either None or an empty string, False otherwise.
- Return type:
bool
See also
is_string()
andis_empty_string()
.>>> is_none_or_empty("") True >>> is_none_or_empty(None) True >>> is_none_or_empty(" ") True >>> is_none_or_empty('Test') False
- pyutils.string_utils.is_number(in_str: str) bool [source]
- Parameters:
in_str (str) – the string to test
- Returns:
True if the string contains a valid numberic value and False otherwise.
- Raises:
TypeError – the input argument isn’t a string
- Return type:
bool
See also
is_integer_number()
,is_decimal_number()
,is_hexidecimal_integer_number()
,is_octal_integer_number()
, etc…>>> is_number(100.5) Traceback (most recent call last): ... TypeError: 100.5 >>> is_number("100.5") True >>> is_number("test") False >>> is_number("99") True >>> is_number([1, 2, 3]) Traceback (most recent call last): ... TypeError: [1, 2, 3]
- pyutils.string_utils.is_octal_integer_number(in_str: str) bool [source]
- Parameters:
in_str (str) – the string to test
- Returns:
True if the string is a valid octal integral number and False otherwise.
- Raises:
TypeError – the input argument isn’t a string
- Return type:
bool
See also
is_integer_number()
,is_decimal_number()
,is_hexidecimal_integer_number()
,is_binary_integer_number()
, etc…>>> is_octal_integer_number('0o777') True >>> is_octal_integer_number('-0O115') True >>> is_octal_integer_number('0xFF') # Not octal, needs 0o False >>> is_octal_integer_number('7777') # Needs 0o False >>> is_octal_integer_number('test') False
- pyutils.string_utils.is_slug(in_str: Any, separator: str = '-') bool [source]
- Parameters:
in_str (Any) – string to test
separator (str) – the slug character to use
- Returns:
True if in_str is a slug string and False otherwise.
- Return type:
bool
See also
is_camel_case()
,is_snake_case()
, andslugify()
.>>> is_slug('my-blog-post-title') True >>> is_slug('My blog post title') False
- pyutils.string_utils.is_snake_case(in_str: Any, *, separator: str = '_') bool [source]
- Parameters:
in_str (Any) – the string to test
separator (str) – the snake case separator character to use
- Return type:
bool
- Returns: True if the string is snake case and False otherwise. A
string is considered snake case when:
it’s composed only by lowercase/uppercase letters and digits
it contains at least one underscore (or provided separator)
it does not start with a number
See also
is_camel_case()
,is_slug()
, andsnake_case_to_camel_case()
.>>> is_snake_case('this_is_a_test') True >>> is_snake_case('___This_Is_A_Test_1_2_3___') True >>> is_snake_case('this-is-a-test') False >>> is_snake_case('this-is-a-test', separator='-') True
- pyutils.string_utils.is_string(in_str: Any) bool [source]
- Parameters:
in_str (Any) – the object to test
- Returns:
True if the object is a string and False otherwise.
- Return type:
bool
See also
is_empty_string()
,is_none_or_empty()
.>>> is_string('test') True >>> is_string(123) False >>> is_string(100.3) False >>> is_string([1, 2, 3]) False
- pyutils.string_utils.is_url(in_str: Any, allowed_schemes: List[str] | None = None) bool [source]
- Parameters:
in_str (Any) – the string to test
allowed_schemes (List[str] | None) – an optional list of allowed schemes (e.g. [‘http’, ‘https’, ‘ftp’]. If passed, only URLs that begin with the one of the schemes passed will be considered to be valid. Otherwise, any scheme:// will be considered valid.
- Returns:
True if in_str contains a valid URL and False otherwise.
- Return type:
bool
>>> is_url('http://www.mysite.com') True >>> is_url('https://mysite.com') True >>> is_url('.mysite.com') False >>> is_url('scheme://username:[email protected]:8042/folder/subfolder/file.extension?param=value¶m2=value2#hash') True
- pyutils.string_utils.is_uuid(in_str: Any, allow_hex: bool = False) bool [source]
- Parameters:
in_str (Any) – the string to test
allow_hex (bool) – should we allow hexidecimal digits in valid uuids?
- Returns:
True if the in_str contains a valid UUID and False otherwise.
- Return type:
bool
See also
generate_uuid()
.>>> is_uuid('6f8aa2f9-686c-4ac3-8766-5712354a04cf') True >>> is_uuid('6f8aa2f9686c4ac387665712354a04cf') False >>> is_uuid('6f8aa2f9686c4ac387665712354a04cf', allow_hex=True) True
- pyutils.string_utils.is_valid_date(in_str: str) bool [source]
- Parameters:
in_str (str) – the string to check
- Returns:
True if the string represents a valid date that we can recognize and False otherwise. This parser is relatively clever; see
datetimes.dateparse_utils
docs for details.- Return type:
bool
See also:
pyutils.datetimes.dateparse_utils
,to_date()
,extract_date()
,to_datetime()
,valid_datetime()
.>>> is_valid_date('1/2/2022') True >>> is_valid_date('christmas') True >>> is_valid_date('next wednesday') True >>> is_valid_date('xyzzy') False
- pyutils.string_utils.it_they(n: int) str [source]
- Parameters:
n (int) – how many of them are there?
- Returns:
‘it’ if n is one or ‘they’ otherwize.
- Return type:
str
See also
is_are()
,pluralize()
,make_contractions()
,thify()
.Suggested usage:
n = num_files_saved_to_tmp() print(f'Saved file{pluralize(n)} successfully.') print(f'{it_they(n)} {is_are(n)} located in /tmp.')
>>> it_they(1) 'it' >>> it_they(100) 'they'
- pyutils.string_utils.make_contractions(txt: str) str [source]
This code glues words in txt together to form (English) contractions.
- Parameters:
txt (str) – the input text to be contractionized.
- Returns:
Output text identical to original input except for any recognized contractions are formed.
- Return type:
str
See also
it_they()
,is_are()
,make_contractions()
.Note
The order in which we create contractions is defined by the implementation and what I thought made more sense when writing this code.
>>> make_contractions('It is nice today.') "It's nice today."
>>> make_contractions('I can not even...') "I can't even..."
>>> make_contractions('She could not see!') "She couldn't see!"
>>> make_contractions('But she will not go.') "But she won't go."
>>> make_contractions('Verily, I shall not.') "Verily, I shan't."
>>> make_contractions('No you cannot.') "No you can't."
>>> make_contractions('I said you can not go.') "I said you can't go."
- pyutils.string_utils.ngrams(txt: str, n: int) Generator[str, str, None] [source]
- Parameters:
txt (str) – the string to create ngrams using
n (int) – how many words per ngram created?
- Returns:
Generates the ngrams from the input string.
- Return type:
Generator[str, str, None]
See also
ngrams_presplit()
,bigrams()
,trigrams()
.>>> [x for x in ngrams('This is a test', 2)] ['This is', 'is a', 'a test']
- pyutils.string_utils.ngrams_presplit(words: Sequence[str], n: int) Generator[Sequence[str], str, None] [source]
Same as
ngrams()
but with the string pre-split.See also
ngrams()
,bigrams()
,trigrams()
.- Parameters:
words (Sequence[str]) –
n (int) –
- Return type:
Generator[Sequence[str], str, None]
- pyutils.string_utils.normalize_punctuation(in_str: str) str [source]
- Parameters:
in_str (str) – the string to normalize.
- Returns:
An output string roughly equivalent to the original string where all punctuation marks are normalized to use the same ASCII character.
- Raises:
TypeError – the input argument isn’t a string
- Return type:
str
See also
to_ascii()
,asciify()
,normalize_whitespace()
>>> normalize_punctuation('“hello,” said the cat; wow!') '"hello," said the cat; wow!'
- pyutils.string_utils.normalize_whitespace(in_str: str) str [source]
- Parameters:
in_str (str) – the string to normalize.
- Returns:
An output string roughly equivalent to the original string where whitespace characters are converted into the ASCII space character (0x20).
- Raises:
TypeError – the input argument isn’t a string
- Return type:
str
See also
to_ascii()
,asciify()
,normalize_punctuation()
>>> normalize_whitespace('testing 123') 'testing 123'
- pyutils.string_utils.number_string_to_integer(in_str: str) int [source]
Convert a string containing a written-out number into an int.
- Parameters:
in_str (str) – the string containing the long-hand written out integer number in English. See examples below.
- Returns:
The integer whose value was parsed from in_str.
- Raises:
ValueError – unable to parse a chunk of the number string
- Return type:
int
See also
integer_to_number_string()
.Warning
This code only handles integers; it will not work with decimals / floats.
>>> number_string_to_integer("one hundred fifty two") 152
>>> number_string_to_integer("ten billion two hundred million fifty four thousand three") 10200054003
>>> number_string_to_integer("four-score and 7") 87
>>> number_string_to_integer("fifty xyzzy three") Traceback (most recent call last): ... ValueError: Unknown word: xyzzy
- pyutils.string_utils.number_to_suffix_string(num: int) str | None [source]
Take a number (of bytes) and returns a string like “43.8Gb”.
- Parameters:
num (int) – an integer number of bytes
- Returns:
A string with a suffix representing num bytes concisely or None to indicate an error.
- Return type:
str | None
See also:
suffix_string_to_number()
.>>> number_to_suffix_string(14066017894) '13.1Gb' >>> number_to_suffix_string(1024 * 1024) '1.0Mb'
- pyutils.string_utils.path_ancestors_before_descendants_sort_key(volume: str) Tuple[str, ...] [source]
- Parameters:
volume (str) – the string to chunk up for sorting purposes
- Returns:
A tuple of volume’s components such that the sorting of volumes using a normal comparator will do something sane and desireable.
- Return type:
Tuple[str, …]
See also
pyutils.files.file_utils
.>>> path_ancestors_before_descendants_sort_key('/usr/local/bin') ('usr', 'local', 'bin')
>>> paths = ['/usr/local', '/usr/local/bin', '/usr'] >>> sorted(paths, key=lambda x: path_ancestors_before_descendants_sort_key(x)) ['/usr', '/usr/local', '/usr/local/bin']
- pyutils.string_utils.pluralize(n: int) str [source]
- Parameters:
n (int) – how many of them are there?
- Returns:
‘s’ if n is greater than one otherwize ‘’.
- Return type:
str
See also
it_they()
,is_are()
,make_contractions()
,thify()
.Suggested usage:
n = num_files_saved_to_tmp() print(f'Saved file{pluralize(n)} successfully.') print(f'{it_they(n)} {is_are(n)} located in /tmp.')
>>> pluralize(15) 's' >>> count = 1 >>> print(f'There {is_are(count)} {count} file{pluralize(count)}.') There is 1 file. >>> count = 4 >>> print(f'There {is_are(count)} {count} file{pluralize(count)}.') There are 4 files.
- pyutils.string_utils.remove_cardinal_suffix(txt: str) str | None [source]
- Parameters:
txt (str) – the number with cardinal suffix to strip.
- Returns:
The same string with its cardinal suffix removed or None on error.
- Return type:
str | None
>>> remove_cardinal_suffix('123rd') '123'
>>> remove_cardinal_suffix('-10th') '-10'
>>> remove_cardinal_suffix('1ero') is None True
- pyutils.string_utils.replace_all(in_str: str, replace_set: str, replacement: str) str [source]
Execute several replace operations in a row.
- Parameters:
in_str (str) – the string in which to replace characters
replace_set (str) – the set of target characters to replace
replacement (str) – the character to replace any member of replace_set with
- Return type:
str
See also
replace_nth()
.- Returns:
The string with replacements executed.
- Parameters:
in_str (str) –
replace_set (str) –
replacement (str) –
- Return type:
str
>>> s = 'this_is a-test!' >>> replace_all(s, ' _-!', '') 'thisisatest'
- pyutils.string_utils.replace_nth(in_str: str, source: str, target: str, nth: int)[source]
Replaces the nth occurrance of a substring within a string.
- Parameters:
in_str (str) – the string in which to run the replacement
source (str) – the substring to replace
target (str) – the replacement text
nth (int) – which occurrance of source to replace?
See also
replace_all()
.>>> replace_nth('this is a test', ' ', '-', 3) 'this is a-test'
- pyutils.string_utils.reverse(in_str: str) str [source]
- Parameters:
in_str (str) – the string to reverse
- Returns:
The reversed (chracter by character) string.
- Raises:
TypeError – the input argument isn’t a string
- Return type:
str
>>> reverse('test') 'tset'
- pyutils.string_utils.scramble(in_str: str) str | None [source]
- Parameters:
in_str (str) – a string to shuffle randomly by character
- Returns:
A new string containing same chars of the given one but in a randomized order. Note that in rare cases this could result in the same original string as no check is done. Returns None to indicate error conditions.
- Return type:
str | None
See also
pyutils.unscrambler
.>>> random.seed(22) >>> scramble('awesome') 'meosaew'
- pyutils.string_utils.shuffle(in_str: str) str | None [source]
- Parameters:
in_str (str) – a string to shuffle randomly by character
- Returns:
A new string containing same chars of the given one but in a randomized order. Note that in rare cases this could result in the same original string as no check is done. Returns None to indicate error conditions.
- Return type:
str | None
>>> random.seed(22) >>> shuffle('awesome') 'meosaew'
- pyutils.string_utils.shuffle_columns_into_dict(input_lines: Sequence[str], column_specs: Iterable[Tuple[str, Iterable[int]]], delim: str = '') Dict[str, str] [source]
Helper to shuffle / parse columnar data and return the results as a dict.
- Parameters:
input_lines (Sequence[str]) – a sequence of strings that represents text that has been broken into columns by the caller
column_specs (Iterable[Tuple[str, Iterable[int]]]) – instructions for what dictionary keys to apply to individual or compound input column data. See example below.
delim (str) – when forming compound output data by gluing more than one input column together, use this character to separate the source data. Defaults to ‘’.
- Returns:
A dict formed by applying the column_specs instructions.
- Return type:
Dict[str, str]
See also
shuffle_columns_into_list()
,interpolate_using_dict()
.>>> cols = '-rwxr-xr-x 1 scott wheel 3.1K Jul 9 11:34 acl_test.py'.split() >>> shuffle_columns_into_dict( ... cols, ... [ ('filename', [8]), ('owner', [2, 3]), ('mtime', [5, 6, 7]) ], ... delim='!', ... ) {'filename': 'acl_test.py', 'owner': 'scott!wheel', 'mtime': 'Jul!9!11:34'}
- pyutils.string_utils.shuffle_columns_into_list(input_lines: Sequence[str], column_specs: Iterable[Iterable[int]], delim: str = '') Iterable[str] [source]
Helper to shuffle / parse columnar data and return the results as a list.
- Parameters:
input_lines (Sequence[str]) – A sequence of strings that represents text that has been broken into columns by the caller
column_specs (Iterable[Iterable[int]]) – an iterable collection of numeric sequences that indicate one or more column numbers to copy to form the Nth position in the output list. See example below.
delim (str) – for column_specs that indicate we should copy more than one column from the input into this position, use delim to separate source data. Defaults to ‘’.
- Returns:
A list of string created by following the instructions set forth in column_specs.
- Return type:
Iterable[str]
See also
shuffle_columns_into_dict()
.>>> cols = '-rwxr-xr-x 1 scott wheel 3.1K Jul 9 11:34 acl_test.py'.split() >>> shuffle_columns_into_list( ... cols, ... [ [8], [2, 3], [5, 6, 7] ], ... delim='!', ... ) ['acl_test.py', 'scott!wheel', 'Jul!9!11:34']
- pyutils.string_utils.slugify(in_str: str, *, separator: str = '-') str [source]
- Parameters:
in_str (str) – the string to slugify
separator (str) – the character to use during sligification (default is a dash)
- Returns:
it has no spaces
all letters are in lower case
all punctuation signs and non alphanumeric chars are removed
words are divided using provided separator
all chars are encoded as ascii (by using
asciify()
)is safe for URL
- Return type:
The converted string. The returned string has the following properties
- Raises:
TypeError – the input argument isn’t a string
See also
is_slug()
andasciify()
.>>> slugify('Top 10 Reasons To Love Dogs!!!') 'top-10-reasons-to-love-dogs' >>> slugify('Mönstér Mägnët') 'monster-magnet'
- pyutils.string_utils.snake_case_to_camel_case(in_str: str, *, upper_case_first: bool = True, separator: str = '_') str [source]
- Parameters:
in_str (str) – the snake case string to convert
upper_case_first (bool) – should we capitalize the first letter?
separator (str) – the separator character to use
- Returns:
A camel case string that is equivalent to the snake case string provided or the original string back again if it is not valid snake case or another error occurs.
- Raises:
TypeError – the input argument isn’t a string
- Return type:
str
See also
is_camel_case()
,is_snake_case()
, andis_slug()
.>>> snake_case_to_camel_case('this_is_a_test') 'ThisIsATest' >>> snake_case_to_camel_case('Han Solo') 'Han Solo'
- pyutils.string_utils.squeeze(in_str: str, character_to_squeeze: str = ' ') str [source]
- Parameters:
in_str (str) – the string to squeeze
character_to_squeeze (str) – the character to remove runs of more than one in a row (default = space)
- Return type:
str
- Returns: A “squeezed string” where runs of more than one
character_to_squeeze into one.
>>> squeeze(' this is a test ') ' this is a test '
>>> squeeze('one|!||!|two|!||!|three', character_to_squeeze='|!|') 'one|!|two|!|three'
- pyutils.string_utils.strip_ansi_sequences(in_str: str) str [source]
- Parameters:
in_str (str) – the string to strip
- Returns:
in_str with recognized ANSI escape sequences removed.
- Return type:
str
See also
pyutils.ansi
.Warning
This method works by using a regular expression. It works for all ANSI escape sequences I’ve tested with but may miss some; caveat emptor.
>>> import ansi as a >>> s = a.fg('blue') + 'blue!' + a.reset() >>> len(s) # '[38;5;21mblue![m' 18 >>> len(strip_ansi_sequences(s)) 5 >>> strip_ansi_sequences(s) 'blue!'
- pyutils.string_utils.strip_escape_sequences(in_str: str) str [source]
- Parameters:
in_str (str) – the string to strip of escape sequences.
- Returns:
in_str with escape sequences removed.
- Return type:
str
See also:
pyutils.ansi
.Note
What is considered to be an “escape sequence” is defined by a regular expression. While this gets common ones, there may exist valid sequences that it doesn’t match.
>>> strip_escape_sequences('[12;11;22mthis is a test!') 'this is a test!'
- pyutils.string_utils.strip_html(in_str: str, keep_tag_content: bool = False) str [source]
- Parameters:
in_str (str) – the string to strip tags from
keep_tag_content (bool) – should we keep the inner contents of tags?
- Returns:
A string with all HTML tags removed (optionally with tag contents preserved).
- Raises:
TypeError – the input argument isn’t a string
- Return type:
str
See also
contains_html()
.Note
This method uses simple regular expressions to strip tags and is not a full fledged HTML parser by any means. Consider using something like BeautifulSoup if your needs are more than this simple code can fulfill.
>>> strip_html('test: <a href="foo/bar">click here</a>') 'test: ' >>> strip_html('test: <a href="foo/bar">click here</a>', keep_tag_content=True) 'test: click here'
- pyutils.string_utils.suffix_string_to_number(in_str: str) int | None [source]
Takes a string like “33Gb” and converts it into a number (of bytes) like 34603008.
- Parameters:
in_str (str) – the string with a suffix to be interpreted and removed.
- Returns:
An integer number of bytes or None to indicate an error.
- Return type:
int | None
See also
number_to_suffix_string()
.>>> suffix_string_to_number('1Mb') 1048576 >>> suffix_string_to_number('13.1Gb') 14066017894 >>> suffix_string_to_number('12345') 12345 >>> x = suffix_string_to_number('a lot') >>> x is None True
- pyutils.string_utils.thify(n: int) str [source]
- Parameters:
n (int) – how many of them are there?
- Returns:
The proper cardinal suffix for a number.
- Return type:
str
See also
it_they()
,is_are()
,make_contractions()
.Suggested usage:
attempt_count = 0 while True: attempt_count += 1 if try_the_thing(): break print(f'The {attempt_count}{thify(attempt_count)} failed, trying again.')
>>> thify(1) 'st' >>> thify(33) 'rd' >>> thify(16) 'th'
- pyutils.string_utils.to_ascii(txt: str)[source]
- Parameters:
txt (str) – the input data to encode
- Returns:
txt encoded as an ASCII byte string.
- Raises:
TypeError – the input argument isn’t a string or bytes
See also
to_base64()
,to_bitstring()
,to_bytes()
,generate_random_alphanumeric_string()
,asciify()
.>>> to_ascii('test') b'test'
>>> to_ascii(b'1, 2, 3') b'1, 2, 3'
- pyutils.string_utils.to_base64(txt: str, *, encoding: str = 'utf-8', errors: str = 'surrogatepass') bytes [source]
- Parameters:
txt (str) – the input data to encode
encoding (str) – the encoding to use during conversion
errors (str) – how to handle encoding errors
- Returns:
txt encoded with a 64-chracter alphabet. Similar to and compatible with uuencode/uudecode.
- Return type:
bytes
See also
is_base64()
,to_ascii()
,to_bitstring()
,from_base64()
.>>> to_base64('hello?') b'aGVsbG8/\n'
- pyutils.string_utils.to_bitstring(txt: str, *, delimiter: str = '') str [source]
- Parameters:
txt (str) – the string to convert into a bitstring
delimiter (str) – character to insert between adjacent bytes. Note that only bitstrings with delimiter=’’ are interpretable by
from_bitstring()
.
- Returns:
txt converted to ascii/binary and then chopped into bytes.
- Return type:
str
See also
to_base64()
,from_bitstring()
,is_bitstring()
,chunk()
.>>> to_bitstring('hello?') '011010000110010101101100011011000110111100111111'
>>> to_bitstring('test', delimiter=' ') '01110100 01100101 01110011 01110100'
>>> to_bitstring(b'test') '01110100011001010111001101110100'
- pyutils.string_utils.to_bool(in_str: str) bool [source]
- Parameters:
in_str (str) – the string to convert to boolean
- Returns:
A boolean equivalent of the original string based on its contents. All conversion is case insensitive. A positive boolean (True) is returned if the string value is any of the following:
”true”
”t”
”1”
”yes”
”y”
”on”
Otherwise False is returned.
- Raises:
TypeError – the input argument isn’t a string
- Return type:
bool
See also
pyutils.argparse_utils
.>>> to_bool('True') True
>>> to_bool('1') True
>>> to_bool('yes') True
>>> to_bool('no') False
>>> to_bool('huh?') False
>>> to_bool('on') True
- pyutils.string_utils.to_char_list(in_str: str) List[str] [source]
- Parameters:
in_str (str) – the string to split into a char list
- Returns:
A list of strings of length one each.
- Return type:
List[str]
See also
from_char_list()
.>>> to_char_list('test') ['t', 'e', 's', 't']
- pyutils.string_utils.to_date(in_str: str) date | None [source]
- Parameters:
in_str (str) – the string to convert into a date
- Returns:
The datetime.date the string contained or None to indicate an error. This parser is relatively clever; see
datetimes.dateparse_utils
docs for details.- Return type:
date | None
See also:
pyutils.datetimes.dateparse_utils
,extract_date()
,is_valid_date()
,to_datetime()
,valid_datetime()
.>>> to_date('9/11/2001') datetime.date(2001, 9, 11) >>> to_date('xyzzy')
- pyutils.string_utils.to_datetime(in_str: str) datetime | None [source]
- Parameters:
in_str (str) – string to parse into a datetime
- Returns:
A python datetime parsed from in_str or None to indicate an error. This parser is relatively clever; see
datetimes.dateparse_utils
docs for details.- Return type:
datetime | None
See also:
pyutils.datetimes.dateparse_utils
,to_date()
,extract_date()
,valid_datetime()
.>>> to_datetime('7/20/1969 02:56 GMT') datetime.datetime(1969, 7, 20, 2, 56, tzinfo=<StaticTzInfo 'GMT'>)
- pyutils.string_utils.to_int(in_str: str) int [source]
- Parameters:
in_str (str) – the string to convert
- Returns:
The integral value of the string.
- Raises:
TypeError – the input argument isn’t a string
- Return type:
int
See also
is_integer_number()
,is_decimal_number()
,is_hexidecimal_integer_number()
,is_octal_integer_number()
,is_binary_integer_number()
, etc…>>> to_int('1234') 1234 >>> to_int('0x1234') 4660 >>> to_int('0b01101') 13 >>> to_int('0o777') 511 >>> to_int('test') Traceback (most recent call last): ... ValueError: invalid literal for int() with base 10: 'test' >>> to_int(123) Traceback (most recent call last): ... TypeError: 123
- pyutils.string_utils.trigrams(txt: str) Generator[str, str, None] [source]
Generates the trigrams (n=3) of the given string.
- Parameters:
txt (str) –
- Return type:
Generator[str, str, None]
- pyutils.string_utils.valid_datetime(in_str: str) bool [source]
- Parameters:
in_str (str) – the string to check
- Returns:
True if in_str contains a valid datetime and False otherwise. This parser is relatively clever; see
datetimes.dateparse_utils
docs for details.- Return type:
bool
>>> valid_datetime('next wednesday at noon') True >>> valid_datetime('3 weeks ago at midnight') True >>> valid_datetime('next easter at 5:00 am') True >>> valid_datetime('sometime soon') False
- pyutils.string_utils.word_count(in_str: str) int [source]
- Parameters:
in_str (str) – the string to count words in
- Returns:
The number of words contained in the given string.
- Return type:
int
Note
This method is “smart” in that it does consider only sequences of one or more letter and/or numbers to be “words”. Thus a string like this: “! @ # % … []” will return zero. Moreover it is aware of punctuation, so the count for a string like “one,two,three.stop” will be 4 not 1 (even if there are no spaces in the string).
>>> word_count('hello world') 2 >>> word_count('one,two,three.stop') 4
- pyutils.string_utils.words_count(in_str: str) int [source]
- Parameters:
in_str (str) – the string to count words in
- Returns:
The number of words contained in the given string.
- Raises:
TypeError – the input argument isn’t a string
- Return type:
int
Note
This method is “smart” in that it does consider only sequences of one or more letter and/or numbers to be “words”. Thus a string like this: “! @ # % … []” will return zero. Moreover it is aware of punctuation, so the count for a string like “one,two,three.stop” will be 4 not 1 (even if there are no spaces in the string).
>>> words_count('hello world') 2 >>> words_count('one,two,three.stop') 4
pyutils.text_utils module
Utilities for dealing with and creating text chunks. For example:
Make a bar graph / progress graph,
make a spark line,
left, right, center, justify text,
word wrap text,
indent / dedent text,
create a header line,
draw a box around some text.
- class pyutils.text_utils.BarGraphText(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]
Bases:
Enum
What kind of text to include at the end of the bar graph?
- FRACTION = (2,)
N / K
- NONE = (0,)
None, leave it blank.
- PERCENTAGE = (1,)
XX.X%
- class pyutils.text_utils.Indenter(*, pad_prefix: str | None = None, pad_char: str = ' ', pad_count: int = 4)[source]
Bases:
AbstractContextManager
Context manager that indents stuff (even recursively). e.g.:
with Indenter(pad_count = 8) as i: i.print('test') with i: i.print('-ing') with i: i.print('1, 2, 3')
Yields:
test -ing 1, 2, 3
Construct an Indenter.
- Parameters:
pad_prefix (str | None) – an optional prefix to prepend to each line
pad_char (str) – the character used to indent
pad_count (int) – the number of pad_chars to use to indent
- class pyutils.text_utils.RowsColumns(rows: int = 0, columns: int = 0)[source]
Bases:
object
Row + Column
- Parameters:
rows (int) –
columns (int) –
- columns: int = 0
Number of columns
- rows: int = 0
Numer of rows
- pyutils.text_utils.bar_graph(current: int, total: int, *, width: int = 70, text: BarGraphText = BarGraphText.PERCENTAGE, fgcolor: str = '\x1b[38;2;255;216;0m', left_end: str = '[', right_end: str = ']', redraw: bool = True) None [source]
Draws a progress graph at the current cursor position.
- Parameters:
current (int) – how many have we done so far?
total (int) – how many are there to do total?
text (BarGraphText) – how should we render the text at the end?
width (int) – how many columns wide should be progress graph be?
fgcolor (str) – what color should “done” part of the graph be?
left_end (str) – the character at the left side of the graph
right_end (str) – the character at the right side of the graph
redraw (bool) – if True, omit a line feed after the carriage return so that subsequent calls to this method redraw the graph iteratively.
- Return type:
None
See also
bar_graph_string()
,sparkline()
.Example:
'[███████████████████████████████████ ] 0.5'
- pyutils.text_utils.bar_graph_string(current: int, total: int, *, text: BarGraphText = BarGraphText.PERCENTAGE, width: int = 70, fgcolor: str = '\x1b[38;2;255;216;0m', reset_seq: str = '\x1b[m', left_end: str = '[', right_end: str = ']') str [source]
Returns a string containing a bar graph.
- Parameters:
current (int) – how many have we done so far?
total (int) – how many are there to do total?
text (BarGraphText) – how should we render the text at the end?
width (int) – how many columns wide should be progress graph be?
fgcolor (str) – what color should “done” part of the graph be?
reset_seq (str) – sequence to use to turn off color
left_end (str) – the character at the left side of the graph
right_end (str) – the character at the right side of the graph
- Raises:
ValueError – if percentage is invalid
- Return type:
str
See also
bar_graph()
,sparkline()
.>>> bar_graph_string(5, 10, fgcolor='', reset_seq='') '[███████████████████████████████████ ] 0.5'
- pyutils.text_utils.box(title: str | None = None, text: str | None = None, *, width: int = 80, color: str = '') str [source]
Make a nice unicode box (optionally with color) around some text.
- Parameters:
title (str | None) – the title of the box
text (str | None) – the text in the box
width (int) – the box’s width
color (str) – the box’s color
- Returns:
the box as a string
- Return type:
str
See also
print_box()
,preformatted_box()
.>>> print(box('title', 'this is some text', width=20).strip()) ╭──────────────────╮ │ title │ │ │ │ this is some │ │ text │ ╰──────────────────╯
- pyutils.text_utils.distribute_strings(strings: List[str], *, width: int = 80, padding: str = ' ') str [source]
Distributes strings into a line for justified text.
- Parameters:
strings (List[str]) – a list of string tokens to distribute
width (int) – the width of the line to create
padding (str) – the padding character to place between string chunks
- Returns:
The distributed, justified string.
- Return type:
str
See also
justify_string()
,justify_text()
.>>> distribute_strings(['this', 'is', 'a', 'test'], width=40) ' this is a test '
- pyutils.text_utils.generate_padded_columns(text: List[str]) Generator [source]
Given a list of strings, break them into columns using
split()
and then compute the maximum width of each column. Finally, distribute the columular chunks into the output padding each to the proper width.- Parameters:
text (List[str]) – a list of strings to chunk into padded columns
- Returns:
padded columns based on text.split()
- Return type:
Generator
>>> for x in generate_padded_columns( ... [ 'reading writing arithmetic', ... 'mathematics psychology physics', ... 'communications sociology anthropology' ]): ... print(x.strip()) reading writing arithmetic mathematics psychology physics communications sociology anthropology
- pyutils.text_utils.get_console_rows_columns() RowsColumns [source]
- Returns:
The number of rows/columns on the current console or None if we can’t tell or an error occurred.
- Raises:
Exception – if the console size can’t be determined.
- Return type:
- pyutils.text_utils.header(title: str, *, width: int | None = None, align: str | None = None, style: str | None = 'solid', color: str | None = None) str [source]
Creates a nice header line with a title.
- Parameters:
title (str) – the title
width (int | None) – how wide to make the header
align (str | None) – “left” or “right”
style (str | None) – “ascii”, “solid” or “dashed”
color (str | None) – what color to use, if any
- Returns:
The header as a string.
- Return type:
str
>>> header('title', width=60, style='ascii') '----[ title ]-----------------------------------------------'
- pyutils.text_utils.justify_string(string: str, *, width: int = 80, alignment: str = 'c', padding: str = ' ') str [source]
Justify a string to width with left, right, center of justified alignment.
- Parameters:
string (str) – the string to justify
width (int) – the width to justify the string to
alignment (str) – a single character indicating the desired alignment: * ‘c’ = centered within the width * ‘j’ = justified at width * ‘l’ = left alignment * ‘r’ = right alignment
padding (str) – the padding character to use while justifying
- Raises:
ValueError – if alignment argument is invalid.
- Return type:
str
>>> justify_string('This is another test', width=40, alignment='c') ' This is another test ' >>> justify_string('This is another test', width=40, alignment='l') 'This is another test ' >>> justify_string('This is another test', width=40, alignment='r') ' This is another test' >>> justify_string('This is another test', width=40, alignment='j') 'This is another test'
- pyutils.text_utils.justify_text(text: str, *, width: int = 80, alignment: str = 'c', indent_by: int = 0) str [source]
Justifies text with left, right, centered or justified alignment and optionally with initial indentation.
- Parameters:
text (str) – the text to be justified
width (int) – the width at which to justify text
alignment (str) – a single character indicating the desired alignment: * ‘c’ = centered within the width * ‘j’ = justified at width * ‘l’ = left alignment * ‘r’ = right alignment
indent_by (int) – if non-zero, adds n prefix spaces to indent the text.
- Returns:
The justified text.
- Return type:
str
See also
justify_text()
.>>> justify_text('This is a test of the emergency broadcast system. This is only a test.', ... width=40, alignment='j') 'This is a test of the emergency\nbroadcast system. This is only a test.'
- pyutils.text_utils.preformatted_box(title: str | None = None, text: str | None = None, *, width: int = 80, color: str = '', kind: str = 'default') str [source]
Creates a nice box with rounded corners and returns it as a string.
- Parameters:
title (str | None) – the title of the box
text (str | None) – the text inside the box
width (int) – the width of the box
color (str) – the box’s color
kind (str) – the kind of box; “default”, “single”, “rounded”, “block”, “double”, “space”, “dashed”
- Returns:
the box as a string
- Return type:
str
See also
print_box()
,box()
.>>> print(preformatted_box('title', 'this\nis\nsome\ntext', width=20).strip()) ╭──────────────────╮ │ title │ │ │ │ this │ │ is │ │ some │ │ text │ ╰──────────────────╯
- pyutils.text_utils.print_box(title: str | None = None, text: str | None = None, *, width: int = 80, color: str = '', kind: str = 'default') None [source]
Draws a box with nice rounded corners.
- Parameters:
title (str | None) – the title of the box
text (str | None) – the text inside the box
width (int) – the width of the box
color (str) – the box’s color
kind (str) – the box type
- Returns:
None
- Return type:
None
- Side-effects:
Prints a box with your text on the console to sys.stdout.
See also
preformatted_box()
,box()
.>>> print_box('Title', 'This is text', width=30) ╭────────────────────────────╮ │ Title │ │ │ │ This is text │ ╰────────────────────────────╯
>>> print_box(None, 'OK', width=6) ╭────╮ │ OK │ ╰────╯
- pyutils.text_utils.sparkline(numbers: List[float]) Tuple[float, float, str] [source]
Makes a “sparkline” little inline histogram graph. Auto scales.
- Parameters:
numbers (List[float]) – the population over which to create the sparkline
- Returns:
the minimum number in the population
the maximum number in the population
a string representation of the population in a concise format
- Return type:
a three tuple containing
See also
bar_graph()
,bar_graph_string()
.>>> sparkline([1, 2, 3, 5, 10, 3, 5, 7]) (1, 10, '▁▁▂▄█▂▄▆')
>>> sparkline([104, 99, 93, 96, 82, 77, 85, 73]) (73, 104, '█▇▆▆▃▂▄▁')
pyutils.unittest_utils module
Helpers for unittests.
Warning
When you import this we automatically wrap the standard Python
unittest.main with a call to pyutils.bootstrap.initialize()
so that we get logger config, commandline args, logging control,
etc… this works fine but may be unexpected behavior.
- class pyutils.unittest_utils.FileBasedPerfRegressionDataPersister(filename: str)[source]
Bases:
PerfRegressionDataPersister
A perf regression data persister that uses files.
- Parameters:
filename (str) – the filename to save/load historical performance data
- delete_performance_data(method_id: str)[source]
Delete the historical performance data of the supplied method.
- Parameters:
method_id (str) – the method whose data should be erased.
- load_performance_data(method_id: str) Dict[str, List[float]] [source]
Load the historical performance data for the supplied method.
- Parameters:
method_id (str) – the method for which we want historical perf data.
- Return type:
Dict[str, List[float]]
- save_performance_data(method_id: str, data: Dict[str, List[float]])[source]
Save the historical performance data of the supplied method.
- Parameters:
method_id (str) – the method whose historical perf data we’re saving.
data (Dict[str, List[float]]) – the historical performance data being persisted.
- class pyutils.unittest_utils.PerfRegressionDataPersister[source]
Bases:
ABC
A base class that defines an interface for dealing with persisting perf regression data.
- abstract delete_performance_data(method_id: str)[source]
Delete the historical performance data of the supplied method.
- Parameters:
method_id (str) – the method whose data should be erased.
- abstract load_performance_data(method_id: str) Dict[str, List[float]] [source]
Load the historical performance data for the supplied method.
- Parameters:
method_id (str) – the method for which we want historical perf data.
- Return type:
Dict[str, List[float]]
- abstract save_performance_data(method_id: str, data: Dict[str, List[float]])[source]
Save the historical performance data of the supplied method.
- Parameters:
method_id (str) – the method whose historical perf data we’re saving.
data (Dict[str, List[float]]) – the historical performance data being persisted.
- class pyutils.unittest_utils.RecordMultipleStreams(*files)[source]
Bases:
AbstractContextManager
Record the output to more than one stream.
Example usage:
with RecordMultipleStreams(sys.stdout, sys.stderr) as r: print("This is a test!", file=sys.stderr) print("This is too", file=sys.stdout) print(r().readlines()) r().close()
- class pyutils.unittest_utils.RecordStderr[source]
Bases:
AbstractContextManager
Record what is emitted to stderr.
>>> import sys >>> with RecordStderr() as record: ... print("This is a test!", file=sys.stderr) >>> print({record().readline()}) {'This is a test!\n'} >>> record().close()
- class pyutils.unittest_utils.RecordStdout[source]
Bases:
AbstractContextManager
Records what is emitted to stdout into a buffer instead.
>>> with RecordStdout() as record: ... print("This is a test!") >>> print({record().readline()}) {'This is a test!\n'} >>> record().close()
- pyutils.unittest_utils.check_all_methods_for_perf_regressions(prefix: str = 'test_')[source]
This decorator is meant to apply to classes that subclass from
unittest.TestCase
and, when applied, has the affect of decorating each method that matches the prefix given with thecheck_method_for_perf_regressions()
wrapper (see above). This wrapper causes us to measure perf and fail tests that regress perf dramatically.- Parameters:
prefix (str) – the prefix of method names to check for regressions
See also
check_method_for_perf_regressions()
to check only a single method.Example usage. By decorating the class, all methods with names that begin with test_ will be perf monitored:
import pyutils.unittest_utils as uu @uu.check_all_methods_for_perf_regressions() class TestMyClass(unittest.TestCase): def test_some_part_of_my_class(self): ... def test_som_other_part_of_my_class(self): ...
- pyutils.unittest_utils.check_method_for_perf_regressions(func: Callable) Callable [source]
This decorator is meant to be used on a method in a class that subclasses
unittest.TestCase
. When decorated, method execution timing (i.e. performance) will be measured and compared with a database of historical performance for the same method. The wrapper will then fail the test with a perf-related message if it has become too slow.See also
check_all_methods_for_perf_regressions()
.Example usage:
class TestMyClass(unittest.TestCase): @check_method_for_perf_regressions def test_some_part_of_my_class(self): ...
- Parameters:
func (Callable) –
- Return type:
Callable
pyutils.unscrambler module
A fast (English) word unscrambler.
The first time you use this class it will attempt to read a list of words from /usr/share/dict/words (or whatever was passed as the –unscrambler_source_dictfile argument) and generate an index at ~/.sparse_index (or whatever was passed as the –unscrambler_default_indexfile argument). It should be ~fast and only happens once. See:
- class pyutils.unscrambler.Unscrambler(indexfile: str | None = None)[source]
Bases:
object
A class that unscrambles words quickly by computing a signature (sig) for the word based on its position independent letter population and then using a pregenerated index to look up known words the same set of letters.
Sigs are designed to cluster similar words near each other so both lookup methods support a “fuzzy match” argument that can be set to request similar words that do not match exactly in addition to any exact matches.
Note
Each instance of Unscrambler caches its index to speed up lookups number 2..N; careless deletion / reinstantiation will suffer slower performance.
Constructs an unscrambler.
- Parameters:
indexfile (str | None) – overrides the default indexfile location if provided. To [re]generate the indexfile, see
repopulate()
.
- static compute_word_sig(word: str) int [source]
Given a word, compute its signature for subsequent lookup operations. Signatures are computed based on the letters in the word and their frequencies. We try to cluster “similar” words close to each other in the signature space.
- Parameters:
word (str) – the word to compute a signature for
- Returns:
The word’s signature.
- Return type:
int
>>> test = Unscrambler.compute_word_sig('test') >>> test 105560478284788
>>> teste = Unscrambler.compute_word_sig('teste') >>> teste 105562386542095
>>> teste - test 1908257307
- static get_dictfile(dictfile: str | None) str [source]
- Returns:
The current dictfile’s location.
- Parameters:
dictfile (str | None) –
- Return type:
str
- static get_indexfile(indexfile: str | None) str [source]
- Returns:
The current indexfile location, generates it if it doesn’t yet exist.
- Parameters:
indexfile (str | None) –
- Return type:
str
- lookup(word: str, *, window_size: int = 5) Dict[str, bool] [source]
Looks up a potentially scrambled word optionally including near “fuzzy” matches.
- Parameters:
word (str) – the word to lookup
window_size (int) – the number of nearby fuzzy matches to return
- Returns:
A dict of word -> bool containing unscrambled words with (close to or precisely) the same letters as the input word. The bool values in this dict indicate whether the key word is an exact or near match. The count of entries in this dict is controlled by the window_size param.
- Return type:
Dict[str, bool]
>>> u = Unscrambler() >>> u.lookup('eanycleocipd', window_size=0) {'encyclopedia': True}
- lookup_by_sig(sig: int, *, window_size: int = 5) Dict[str, bool] [source]
Looks up a word that has already been translated into a signature by a previous call to Unscrambler.compute_word_sig. Optionally returns near “fuzzy” matches.
- Parameters:
sig (int) – the signature of the word to lookup (see
compute_word_sig()
to generate these signatures).window_size (int) – the number of nearby fuzzy matches to return
- Returns:
A dict of word -> bool containing unscrambled words with (close to or precisely) the same letters as the input word. The bool values in this dict indicate whether the key word is an exact or near match. The count of entries in this dict is controlled by the window_size param.
- Return type:
Dict[str, bool]
>>> sig = Unscrambler.compute_word_sig('sunepsapetuargiarin') >>> sig 18491949645300288339
>>> u = Unscrambler() >>> u.lookup_by_sig(sig) {'scuppering': False, 'outcroppings': False, "outcropping's": False, 'supplicating': False, 'suppurating': False, 'uppercutting': False, 'pepping': False, 'pipping': False, 'popping': False, 'prepping': False, 'peppering': False}
- static repopulate(dictfile: str = '/usr/share/dict/words', indexfile: str = '/home/scott/.sparse_index') None [source]
Repopulates the indexfile.
- Parameters:
dictfile (str) – a file that contains one word per line
indexfile (str) – the file to populate with sig, word pairs for future use by this class.
- Return type:
None
Warning
Before calling this method, change letter_sigs from the default above unless you want to populate the same exact files.
pyutils.zookeeper module
This is a module for making it easier to deal with Zookeeper / Kazoo.
Apache Zookeeper (https://zookeeper.apache.org/) is a consistent centralized
datastore. pyutils.config
optionally uses it to save/read program
configuration. But it’s also very useful for things like distributed
master election, locking, etc…
- class pyutils.zookeeper.RenewableReleasableLease(client: ~kazoo.client.KazooClient, path: str, duration: ~datetime.timedelta, identifier: str | None = None, utcnow=<built-in method utcnow of type object>)[source]
Bases:
NonBlockingLease
This is a hacky subclass of kazoo.recipe.lease.NonBlockingLease (see https://kazoo.readthedocs.io/en/latest/api/recipe/lease.html#kazoo.recipe.lease.NonBlockingLease) that adds some behaviors:
Ability to renew the lease if it’s already held without going through the effort of reobtaining the same lease name.
Ability to release the lease if it’s held and not yet expired.
It also is more picky than the base class in terms of when it evaluates to “True” (indicating that the lease is held); it will begin to evaluate to “False” as soon as the lease has expired even if you used to hold it. This means client code should be aware that the lease can disappear (expire) while held and it also means that the performance of evaulating the lease (i.e. if lease:) requires a round trip to zookeeper every time.
Note that it is not valid to release the lease more than once (since you no longer have it the second time). The code ignores the 2nd..nth attempt. It’s also not possible to reobtain an expired or released lease by calling renew. Go create a new lease object at that point. Finally, note that when you renew the lease it will evaluate to False briefly as it is reobtained.
Construct the RenewableReleasableLease.
- Parameters:
client (KazooClient) – a KazooClient that is connected and started
path (str) – the path to the lease in zookeeper
duration (timedelta) – duration during which the lease is reserved
identifier (str | None) – unique name to use for this lease holder. Reuse in order to renew the lease.
utcnow – clock function, by default returning
datetime.datetime.utcnow()
. Used for testing.
- release() bool [source]
Release the lease, if it’s presently being held.
- Returns:
True if the lease was successfully released, False otherwise.
- Return type:
bool
- try_renew(duration: timedelta) bool [source]
Attempt to renew a lease that is currently held. Note that this will cause self to evaluate to False briefly as the lease is renewed.
- Parameters:
duration (timedelta) – the amount of additional time to add to the current lease expiration.
- Returns:
True if the lease was successfully renewed, False otherwise.
- Return type:
bool
- pyutils.zookeeper.get_started_zk_client() KazooClient [source]
- Returns:
A zk client library reference that has been connected and started using the commandline provided address, certificates and passphrase.
- Return type:
KazooClient
- pyutils.zookeeper.get_zookeeper_config() Tuple[str, str, str] | None [source]
- Return type:
Tuple[str, str, str] | None
- pyutils.zookeeper.obtain_lease(f: Callable | None = None, *, lease_id: str = 'sphinx-build', contender_id: str = 'wannabe.house', duration: timedelta = datetime.timedelta(seconds=300), also_pass_lease: bool = False, also_pass_zk_client: bool = False)[source]
Obtain an exclusive lease identified by the lease_id name before invoking a function or skip invoking the function if the lease cannot be obtained.
Note that we use a hacky “RenewableReleasableLease” and not the kazoo NonBlockingLease because the former allows us to release the lease when the user code returns whereas the latter does not.
- Parameters:
lease_id (str) – string identifying the lease to obtain
contender_id (str) – string identifying who’s attempting to obtain
duration (timedelta) – how long should the lease be held, if obtained?
also_pass_lease (bool) – pass the lease into the user function
also_pass_zk_client (bool) – pass our zk client into the user function
f (Callable | None) –
>>> @obtain_lease( ... lease_id='zookeeper_doctest', ... duration=datetime.timedelta(seconds=5), ... ) ... def f(name: str) -> int: ... print(f'Hello, {name}') ... return 123
>>> f('Scott') Hello, Scott 123
- pyutils.zookeeper.run_for_election(f: Callable | None = None, *, election_id: str = 'sphinx-build', contender_id: str = 'wannabe.house', also_pass_zk_client: bool = False)[source]
Run as a contender for a leader election. If/when we become the leader, invoke the user’s function.
The user’s function will be executed on a new thread and must accept a “stop processing” event that it must check regularly. This event will be set automatically by the wrapper in the event that we lose connection to zookeeper (and hence are no longer confident that we are still the leader).
The user’s function may return at any time which will cause the wrapper to also return and effectively cede leadership.
Because the user’s code is run in a separate thread, it may not return anything / whatever it returns will be dropped.
- Parameters:
election_id (str) – global string identifier for the election
contender_id (str) – string identifying who is running for leader
also_pass_zk_client (bool) – pass the zk client into the user code
f (Callable | None) –
>>> @run_for_election( ... election_id='zookeeper_doctest', ... also_pass_zk_client=True ... ) ... def g(name: str, zk: KazooClient, stop_now: threading.Event): ... import time ... count = 0 ... while True: ... print(f"Hello, {name}, I'm the leader.") ... if stop_now.is_set(): ... print("Oops, not anymore?!") ... return ... time.sleep(0.1) ... count += 1 ... if count >= 3: ... print("I'm sick of being leader.") ... return
>>> g("Scott") Hello, Scott, I'm the leader. Hello, Scott, I'm the leader. Hello, Scott, I'm the leader. I'm sick of being leader.