import sys
from typing import Dict, Optional, Tuple
-import string_utils
-
logger = logging.getLogger(__name__)
# https://en.wikipedia.org/wiki/ANSI_escape_code
*,
force_16color: bool = False,
force_216color: bool = False) -> str:
+ import string_utils
+
if name is not None and string_utils.is_full_string(name):
rgb = _find_color_by_name(name)
return fg(
red: Optional[int] = None,
green: Optional[int] = None,
blue: Optional[int] = None) -> Tuple[int, int, int]:
+ import string_utils
+
if name is not None and string_utils.is_full_string(name):
rgb = _find_color_by_name(name)
else:
*,
force_16color: bool = False,
force_216color: bool = False) -> str:
+ import string_utils
+
if name is not None and string_utils.is_full_string(name):
rgb = _find_color_by_name(name)
return bg(
import logging
import os
-import string_utils
-
logger = logging.getLogger(__name__)
def valid_bool(v):
if isinstance(v, bool):
return v
- return string_utils.to_bool(v)
+ from string_utils import to_bool
+ return to_bool(v)
def valid_ip(ip: str) -> str:
- s = string_utils.extract_ip_v4(ip.strip())
+ from string_utils import extract_ip_v4
+ s = extract_ip_v4(ip.strip())
if s is not None:
return s
msg = f"{ip} is an invalid IP address"
def valid_mac(mac: str) -> str:
- s = string_utils.extract_mac_address(mac)
+ from string_utils import extract_mac_address
+ s = extract_mac_address(mac)
if s is not None:
return s
msg = f"{mac} is an invalid MAC address"
def valid_date(txt: str) -> datetime.date:
- date = string_utils.to_date(txt)
+ from string_utils import to_date
+ date = to_date(txt)
if date is not None:
return date
msg = f'Cannot parse argument as a date: {txt}'
def valid_datetime(txt: str) -> datetime.datetime:
- dt = string_utils.to_datetime(txt)
+ from string_utils import to_datetime
+ dt = to_datetime(txt)
if dt is not None:
return dt
msg = f'Cannot parse argument as datetime: {txt}'
import argparse_utils
import config
-import logging_utils
logger = logging.getLogger(__name__)
def initialize(funct):
+ import logging_utils
+
"""Remember to initialize config and logging before running main."""
@functools.wraps(funct)
def initialize_wrapper(*args, **kwargs):
import sys
from typing import Any, Dict, List
-import string_utils
-
# Note: at this point in time, logging hasn't been configured and
# anything we log will come out the root logger.
def parse() -> Dict[str, Any]:
+ import string_utils
+
"""Main program should call this early in main()"""
global config_parse_called
if config_parse_called:
import acl
import bootstrap
-import datetime_utils
-import decorator_utils
+from decorator_utils import decorate_matching_methods_with
from dateparse.dateparse_utilsLexer import dateparse_utilsLexer # type: ignore
from dateparse.dateparse_utilsListener import dateparse_utilsListener # type: ignore
from dateparse.dateparse_utilsParser import dateparse_utilsParser # type: ignore
pass
-@decorator_utils.decorate_matching_methods_with(
+@decorate_matching_methods_with(
debug_parse,
acl=acl.StringWildcardBasedACL(
allowed_patterns=[
idea of "now" so that the code can be more easily unittested.
Leave as None for real use cases.
"""
+ from datetime_utils import TimeUnit
self.month_name_to_number = {
'jan': 1,
'feb': 2,
# These TimeUnits are defined in datetime_utils and are used as params
# to datetime_utils.n_timeunits_from_base.
self.time_delta_unit_to_constant = {
- 'hou': datetime_utils.TimeUnit.HOURS,
- 'min': datetime_utils.TimeUnit.MINUTES,
- 'sec': datetime_utils.TimeUnit.SECONDS,
+ 'hou': TimeUnit.HOURS,
+ 'min': TimeUnit.MINUTES,
+ 'sec': TimeUnit.SECONDS,
}
self.delta_unit_to_constant = {
- 'day': datetime_utils.TimeUnit.DAYS,
- 'wor': datetime_utils.TimeUnit.WORKDAYS,
- 'wee': datetime_utils.TimeUnit.WEEKS,
- 'mon': datetime_utils.TimeUnit.MONTHS,
- 'yea': datetime_utils.TimeUnit.YEARS,
+ 'day': TimeUnit.DAYS,
+ 'wor': TimeUnit.WORKDAYS,
+ 'wee': TimeUnit.WEEKS,
+ 'mon': TimeUnit.MONTHS,
+ 'yea': TimeUnit.YEARS,
}
self.override_now_for_test_purposes = override_now_for_test_purposes
self._reset()
def _reset(self):
"""Reset at init and between parses."""
+ from datetime_utils import datetime_to_date
if self.override_now_for_test_purposes is None:
self.now_datetime = datetime.datetime.now()
self.today = datetime.date.today()
else:
self.now_datetime = self.override_now_for_test_purposes
- self.today = datetime_utils.datetime_to_date(
+ self.today = datetime_to_date(
self.override_now_for_test_purposes
)
self.date: Optional[datetime.date] = None
def _figure_out_date_unit(self, orig: str) -> int:
"""Figure out what unit a date expression piece is talking about."""
+ from datetime_utils import TimeUnit
if 'month' in orig:
- return datetime_utils.TimeUnit.MONTHS
+ return TimeUnit.MONTHS
txt = orig.lower()[:3]
if txt in self.day_name_to_number:
return(self.day_name_to_number[txt])
def exitDateExpr(self, ctx: dateparse_utilsParser.DateExprContext) -> None:
"""When we leave the date expression, populate self.date."""
+ from datetime_utils import (
+ n_timeunits_from_base, datetime_to_date, date_to_datetime
+ )
if 'special' in self.context:
self.date = self._parse_special_date(self.context['special'])
else:
if 'delta_unit' not in self.context:
raise ParseException('Missing delta_unit?!')
unit = self.context['delta_unit']
- dt = datetime_utils.n_timeunits_from_base(
+ dt = n_timeunits_from_base(
count,
unit,
- datetime_utils.date_to_datetime(self.date)
+ date_to_datetime(self.date)
)
- self.date = datetime_utils.datetime_to_date(dt)
+ self.date = datetime_to_date(dt)
def exitTimeExpr(self, ctx: dateparse_utilsParser.TimeExprContext) -> None:
# Simple time?
+ from datetime_utils import TimeUnit
self.time = datetime.time(
self.context['hour'],
self.context['minute'],
self.timedelta += datetime.timedelta(minutes=count)
else:
unit = self.context['time_delta_unit']
- if unit == datetime_utils.TimeUnit.SECONDS:
+ if unit == TimeUnit.SECONDS:
self.timedelta += datetime.timedelta(seconds=count)
- elif unit == datetime_utils.TimeUnit.MINUTES:
+ elif unit == TimeUnit.MINUTES:
self.timedelta = datetime.timedelta(minutes=count)
- elif unit == datetime_utils.TimeUnit.HOURS:
+ elif unit == TimeUnit.HOURS:
self.timedelta = datetime.timedelta(hours=count)
else:
raise ParseException()
def exitDeltaTimeFraction(
self, ctx: dateparse_utilsParser.DeltaTimeFractionContext
) -> None:
+ from datetime_utils import TimeUnit
try:
txt = ctx.getText().lower()[:4]
if txt == 'quar':
self.context['time_delta_int'] = 15
self.context[
'time_delta_unit'
- ] = datetime_utils.TimeUnit.MINUTES
+ ] = TimeUnit.MINUTES
elif txt == 'half':
self.context['time_delta_int'] = 30
self.context[
'time_delta_unit'
- ] = datetime_utils.TimeUnit.MINUTES
+ ] = TimeUnit.MINUTES
else:
raise ParseException(f'Bad time fraction {ctx.getText()}')
except:
def exitNFoosFromTodayAgoExpr(
self, ctx: dateparse_utilsParser.NFoosFromTodayAgoExprContext
) -> None:
+ from datetime_utils import n_timeunits_from_base
d = self.now_datetime
try:
count = self._get_int(ctx.unsignedInt().getText())
count = -count
unit = self._figure_out_date_unit(unit)
- d = datetime_utils.n_timeunits_from_base(
+ d = n_timeunits_from_base(
count,
unit,
d)
def exitDeltaRelativeToTodayExpr(
self, ctx: dateparse_utilsParser.DeltaRelativeToTodayExprContext
) -> None:
+ from datetime_utils import n_timeunits_from_base
d = self.now_datetime
try:
mod = ctx.thisNextLast()
f'Bad DeltaRelativeToTodayExpr: {ctx.getText()}'
)
unit = self._figure_out_date_unit(unit)
- d = datetime_utils.n_timeunits_from_base(
+ d = n_timeunits_from_base(
count,
unit,
d)
import warnings
import exceptions
-import thread_utils
logger = logging.getLogger(__name__)
parameter. The function is wrapped and returned to the caller.
"""
if use_signals is None:
+ import thread_utils
use_signals = thread_utils.is_current_thread_main_thread()
def decorate(function):
from itertools import islice
from typing import Any, Callable, Dict, Iterator, Tuple
-import list_utils
-
-
def init_or_inc(
d: Dict[Any, Any],
key: Any,
def coalesce_by_creating_list(key, v1, v2):
- return list_utils.flatten([v1, v2])
+ from list_utils import flatten
+ return flatten([v1, v2])
def coalesce_by_creating_set(key, v1, v2):
import os
from typing import Any, Optional
-import file_utils
-
class DirectoryFileFilter(object):
"""A predicate that will return False if when a proposed file's
"""
def __init__(self, directory: str):
+ import file_utils
super().__init__()
if not file_utils.does_directory_exist(directory):
raise ValueError(directory)
self._update_file(path, mtime)
def _update_file(self, filename: str, mtime: Optional[float] = None):
+ import file_utils
assert file_utils.does_file_exist(filename)
if mtime is None:
mtime = file_utils.get_file_raw_mtime(filename)
print(self.all_md5s)
def _update_file(self, filename: str, mtime: Optional[float] = None):
+ import file_utils
assert file_utils.does_file_exist(filename)
if mtime is None:
mtime = file_utils.get_file_raw_mtime(filename)
from ansi import bg, fg, underline, reset
import argparse_utils
import config
-import exec_utils
+from exec_utils import run_silently, cmd_in_background
from decorator_utils import singleton
import histogram
-import string_utils
logger = logging.getLogger(__name__)
if hostname not in machine:
cmd = f'{RSYNC} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
logger.info(f"Copying work to {worker} via {cmd}")
- exec_utils.run_silently(cmd)
+ run_silently(cmd)
# Do it.
cmd = (f'{SSH} {bundle.username}@{bundle.machine} '
f'"source remote-execution/bin/activate &&'
f' /home/scott/lib/python_modules/remote_worker.py'
f' --code_file {bundle.code_file} --result_file {bundle.result_file}"')
- p = exec_utils.cmd_in_background(cmd, silent=True)
+ p = cmd_in_background(cmd, silent=True)
bundle.pid = pid = p.pid
logger.info(f"Running {cmd} in the background as process {pid}")
f"Fetching results from {username}@{machine} via {cmd}"
)
try:
- exec_utils.run_silently(cmd)
+ run_silently(cmd)
except subprocess.CalledProcessError:
pass
- exec_utils.run_silently(f'{SSH} {username}@{machine}'
- f' "/bin/rm -f {code_file} {result_file}"')
+ run_silently(f'{SSH} {username}@{machine}'
+ f' "/bin/rm -f {code_file} {result_file}"')
bundle.end_ts = time.time()
assert bundle.worker is not None
self.status.record_release_worker_already_locked(
return result
def create_original_bundle(self, pickle):
- uuid = string_utils.generate_uuid(as_hex=True)
+ from string_utils import generate_uuid
+ uuid = generate_uuid(as_hex=True)
code_file = f'/tmp/{uuid}.code.bin'
result_file = f'/tmp/{uuid}.result.bin'
import glob
from os.path import isfile, join, exists
-import datetime_utils
-
-
logger = logging.getLogger(__name__)
def describe_file_timestamp(
filename: str, extractor, *, brief=False
) -> Optional[str]:
+ from datetime_utils import describe_duration, describe_duration_briefly
age = get_file_timestamp_age_seconds(filename, extractor)
if age is None:
return None
if brief:
- return datetime_utils.describe_duration_briefly(age)
+ return describe_duration_briefly(age)
else:
- return datetime_utils.describe_duration(age)
+ return describe_duration(age)
def describe_file_atime(filename: str, *, brief=False) -> Optional[str]:
from numbers import Number
from typing import Generic, Iterable, List, Optional, Tuple, TypeVar
-from math_utils import RunningMedian
-from text_utils import bar_graph
-
-
T = TypeVar("T", bound=Number)
NEGATIVE_INFINITY = -math.inf
def __init__(self, buckets: List[Tuple[T, T]]):
+ from math_utils import RunningMedian
self.buckets = {}
for start_end in buckets:
if self._get_bucket(start_end[0]) is not None:
return all_true
def __repr__(self) -> str:
+ from text_utils import bar_graph
max_population: Optional[int] = None
for bucket in self.buckets:
pop = self.buckets[bucket]
import argparse_utils
import config
-import logical_search
import logging_utils
-import google_assistant as goog
+from google_assistant import ask_google, GoogleResponse
from decorator_utils import timeout, memoized
logger = logging.getLogger(__name__)
return name.replace("_", " ")
@staticmethod
- def parse_google_response(response: goog.GoogleResponse) -> bool:
+ def parse_google_response(response: GoogleResponse) -> bool:
return response.success
def turn_on(self) -> bool:
return GoogleLight.parse_google_response(
- goog.ask_google(f"turn {self.goog_name()} on")
+ ask_google(f"turn {self.goog_name()} on")
)
def turn_off(self) -> bool:
return GoogleLight.parse_google_response(
- goog.ask_google(f"turn {self.goog_name()} off")
+ ask_google(f"turn {self.goog_name()} off")
)
def is_on(self) -> bool:
- r = goog.ask_google(f"is {self.goog_name()} on?")
+ r = ask_google(f"is {self.goog_name()} on?")
if not r.success:
return False
return 'is on' in r.audio_transcription
def get_dimmer_level(self) -> Optional[int]:
if not self.has_keyword("dimmer"):
return False
- r = goog.ask_google(f'how bright is {self.goog_name()}?')
+ r = ask_google(f'how bright is {self.goog_name()}?')
if not r.success:
return None
return False
if 0 <= level <= 100:
was_on = self.is_on()
- r = goog.ask_google(f"set {self.goog_name()} to {level} percent")
+ r = ask_google(f"set {self.goog_name()} to {level} percent")
if not r.success:
return False
if not was_on:
def make_color(self, color: str) -> bool:
return GoogleLight.parse_google_response(
- goog.ask_google(f"make {self.goog_name()} {color}")
+ ask_google(f"make {self.goog_name()} {color}")
)
self,
config_file: str = None,
) -> None:
+ import logical_search
if config_file is None:
config_file = config.config[
'light_utils_network_mac_addresses_location'
import argparse_utils
import config
-import string_utils as su
-import thread_utils as tu
cfg = config.add_commandline_args(
f'Logging ({__file__})',
def tprint(*args, **kwargs) -> None:
if config.config['logging_debug_threads']:
- print(f'{tu.current_thread_id()}', end="")
+ from thread_utils import current_thread_id
+ print(f'{current_thread_id()}', end="")
print(*args, **kwargs)
else:
pass
self.destination_bitv = destination_bitv
def print(self, *args, **kwargs):
+ from string_utils import sprintf, strip_escape_sequences
end = kwargs.pop("end", None)
if end is not None:
if not isinstance(end, str):
raise TypeError("sep must be None or a string")
if kwargs:
raise TypeError("invalid keyword arguments to print()")
- buf = su.sprintf(*args, end="", sep=sep)
+ buf = sprintf(*args, end="", sep=sep)
if sep is None:
sep = " "
if end is None:
if self.destination_bitv & self.FILENAME and self.f is not None:
self.f.write(buf.encode('utf-8'))
self.f.flush()
- buf = su.strip_escape_sequences(buf)
+ buf = strip_escape_sequences(buf)
if self.logger is not None:
if self.destination_bitv & self.LOG_DEBUG:
self.logger.debug(buf)
import os
-import string_utils
-
-
def is_running_as_root() -> bool:
return os.geteuid() == 0
from ansi import bold, reset
import argparse_utils
import config
-import datetime_utils
-import decorator_utils
-import input_utils
-import list_utils
+from decorator_utils import timed
import parallelize as par
-import smart_future
-import string_utils
-import text_utils
logger = logging.getLogger(__file__)
self.spec = None
def train(self, spec: InputSpec) -> OutputSpec:
+ import smart_future
+
random.seed()
self.spec = spec
def make_progress_graph(self) -> None:
if not self.spec.quiet:
- text_utils.progress_graph(self.file_done_count,
- self.total_file_count)
+ from text_utils import progress_graph
+ progress_graph(
+ self.file_done_count,
+ self.total_file_count
+ )
- @decorator_utils.timed
+ @timed
def read_input_files(self):
+ import list_utils
+ import smart_future
+
# All features
X = []
scaler: Any,
model: Any) -> Tuple[Optional[str], Optional[str], Optional[str]]:
if not self.spec.dry_run:
+ import datetime_utils
+ import input_utils
+ import string_utils
+
if (
(self.spec.persist_percentage_threshold is not None and
test_score > self.spec.persist_percentage_threshold)
import argparse_utils
import config
-import input_utils
logger = logging.getLogger(__name__)
parser = config.add_commandline_args(
def label(in_spec: InputSpec) -> None:
+ import input_utils
+
images = []
if in_spec.image_file_glob is not None:
images += glob.glob(in_spec.image_file_glob)
import functools
import typing
-import executors
-import smart_future
-
ps_count = 0
thread_count = 0
remote_count = 0
@functools.wraps(funct)
def inner_wrapper(*args, **kwargs):
+ import executors
+ import smart_future
+
# Look for as of yet unresolved arguments in _funct's
# argument list and resolve them now.
newargs = []
import argparse_utils
import config
-import dict_utils
-import exec_utils
logger = logging.getLogger(__name__)
self.update()
def update(self) -> None:
+ from exec_utils import cmd
persisted_macs = config.config['presence_macs_file']
self.read_persisted_macs_file(persisted_macs, Location.HOUSE)
- raw = exec_utils.cmd(
+ raw = cmd(
)
self.parse_raw_macs_file(raw, Location.CABIN)
return False
def where_is_person_now(self, name: Person) -> Location:
+ import dict_utils
+
if name is Person.UNKNOWN:
if self.weird_mac_at_cabin:
return Location.CABIN
import pytz
-import math_utils
from thread_utils import background_thread
logger = logging.getLogger(__name__)
*,
override_sleep_delay: Optional[float] = None,
) -> None:
+ import math_utils
super().__init__(update_ids_to_update_secs)
if override_sleep_delay is not None:
logger.debug(f'Overriding sleep delay to {override_sleep_delay}')
import unicodedata
from uuid import uuid4
-import dateparse.dateparse_utils as dp
-
-
logger = logging.getLogger(__name__)
NUMBER_RE = re.compile(r"^([+\-]?)((\d+)(\.\d+)?([e|E]\d+)?|\.\d+)$")
def to_date(in_str: str) -> Optional[datetime.date]:
+ import dateparse.dateparse_utils as dp
try:
d = dp.DateParser()
d.parse(in_str)
def valid_date(in_str: str) -> bool:
+ import dateparse.dateparse_utils as dp
try:
d = dp.DateParser()
_ = d.parse(in_str)
def to_datetime(in_str: str) -> Optional[datetime.datetime]:
+ import dateparse.dateparse_utils as dp
try:
d = dp.DateParser()
dt = d.parse(in_str)
from typing import List, NamedTuple
from ansi import fg, reset
-import exec_utils
class RowsColumns(NamedTuple):
def get_console_rows_columns() -> RowsColumns:
- rows, columns = exec_utils.cmd("stty size").split()
+ from exec_utils import cmd
+ rows, columns = cmd("stty size").split()
return RowsColumns(int(rows), int(columns))