Order.DENY_ALLOW,
):
raise Exception(
- 'order_to_check_allow_deny must be Order.ALLOW_DENY or '
- + 'Order.DENY_ALLOW'
+ 'order_to_check_allow_deny must be Order.ALLOW_DENY or ' + 'Order.DENY_ALLOW'
)
self.order_to_check_allow_deny = order_to_check_allow_deny
self.default_answer = default_answer
allow_predicates = []
if allowed_patterns is not None:
for pattern in allowed_patterns:
- allow_predicates.append(
- lambda x, pattern=pattern: fnmatch.fnmatch(x, pattern)
- )
+ allow_predicates.append(lambda x, pattern=pattern: fnmatch.fnmatch(x, pattern))
deny_predicates = None
if denied_patterns is not None:
deny_predicates = []
for pattern in denied_patterns:
- deny_predicates.append(
- lambda x, pattern=pattern: fnmatch.fnmatch(x, pattern)
- )
+ deny_predicates.append(lambda x, pattern=pattern: fnmatch.fnmatch(x, pattern))
super().__init__(
allow_predicate_list=allow_predicates,
if allowed_regexs is not None:
allow_predicates = []
for pattern in allowed_regexs:
- allow_predicates.append(
- lambda x, pattern=pattern: pattern.match(x) is not None
- )
+ allow_predicates.append(lambda x, pattern=pattern: pattern.match(x) is not None)
deny_predicates = None
if denied_regexs is not None:
deny_predicates = []
for pattern in denied_regexs:
- deny_predicates.append(
- lambda x, pattern=pattern: pattern.match(x) is not None
- )
+ deny_predicates.append(lambda x, pattern=pattern: pattern.match(x) is not None)
super().__init__(
allow_predicate_list=allow_predicates,
deny_predicate_list=deny_predicates,
if (is_16color(red) and is_16color(green) and is_16color(blue)) or force_16color:
logger.debug("Using 16-color strategy")
return fg_16color(red, green, blue)
- if (
- is_216color(red) and is_216color(green) and is_216color(blue)
- ) or force_216color:
+ if (is_216color(red) and is_216color(green) and is_216color(blue)) or force_216color:
logger.debug("Using 216-color strategy")
return fg_216color(red, green, blue)
logger.debug("Using 24-bit color strategy")
if (is_16color(red) and is_16color(green) and is_16color(blue)) or force_16color:
logger.debug("Using 16-color strategy")
return bg_16color(red, green, blue)
- if (
- is_216color(red) and is_216color(green) and is_216color(blue)
- ) or force_216color:
+ if (is_216color(red) and is_216color(green) and is_216color(blue)) or force_216color:
logger.debug("Using 216-color strategy")
return bg_216color(red, green, blue)
logger.debug("Using 24-bit color strategy")
_ = pick_contrasting_color(possibility)
xf = fg(None, _[0], _[1], _[2])
xb = bg(None, _[0], _[1], _[2])
- print(
- f'{f}{xb}{possibility}{reset()}\t\t\t'
- f'{b}{xf}{possibility}{reset()}'
- )
+ print(f'{f}{xb}{possibility}{reset()}\t\t\t' f'{b}{xf}{possibility}{reset()}')
main()
self.update_from_arp_scan()
self.update_from_arp()
if len(self.state) < config.config['arper_min_entries_to_be_valid']:
- raise Exception(
- f'Arper didn\'t find enough entries; only got {len(self.state)}.'
- )
+ raise Exception(f'Arper didn\'t find enough entries; only got {len(self.state)}.')
def update_from_arp_scan(self):
network_spec = site_config.get_config().network
for line in output.split('\n'):
ip = string_utils.extract_ip_v4(line)
mac = string_utils.extract_mac_address(line)
- if (
- ip is not None
- and mac is not None
- and mac != 'UNKNOWN'
- and ip != 'UNKNOWN'
- ):
+ if ip is not None and mac is not None and mac != 'UNKNOWN' and ip != 'UNKNOWN':
mac = mac.lower()
logger.debug(f'ARPER: {mac} => {ip}')
self.state[mac] = ip
for line in output.split('\n'):
ip = string_utils.extract_ip_v4(line)
mac = string_utils.extract_mac_address(line)
- if (
- ip is not None
- and mac is not None
- and mac != 'UNKNOWN'
- and ip != 'UNKNOWN'
- ):
+ if ip is not None and mac is not None and mac != 'UNKNOWN' and ip != 'UNKNOWN':
mac = mac.lower()
logger.debug(f'ARPER: {mac} => {ip}')
self.state[mac] = ip
self.run_location = site_config.get_location()
logger.debug(f"run_location is {self.run_location}")
self.weird_mac_at_cabin = False
- self.location_ts_by_mac: Dict[
- Location, Dict[str, datetime.datetime]
- ] = defaultdict(dict)
+ self.location_ts_by_mac: Dict[Location, Dict[str, datetime.datetime]] = defaultdict(dict)
self.names_by_mac: Dict[str, str] = {}
self.dark_locations: Set[Location] = set()
self.last_update: Optional[datetime.datetime] = None
def where_is_person_now(self, name: Person) -> Location:
self.maybe_update()
if len(self.dark_locations) > 0:
- msg = (
- f"Can't see {self.dark_locations} right now; answer confidence impacted"
- )
+ msg = f"Can't see {self.dark_locations} right now; answer confidence impacted"
logger.warning(msg)
warnings.warn(msg, stacklevel=2)
logger.debug(f'Looking for {name}...')
return 'importlib' in filename or 'six.py' in filename
def find_module(self, fullname, path):
- raise Exception(
- "This method has been deprecated since Python 3.4, please upgrade."
- )
+ raise Exception("This method has been deprecated since Python 3.4, please upgrade.")
def find_spec(self, loaded_module, path=None, target=None):
s = stack()
# Try to figure out the name of the program entry point. Then
# parse configuration (based on cmdline flags, environment vars
# etc...)
- if (
- '__globals__' in entry_point.__dict__
- and '__file__' in entry_point.__globals__
- ):
+ if '__globals__' in entry_point.__dict__ and '__file__' in entry_point.__globals__:
config.parse(entry_point.__globals__['__file__'])
else:
config.parse(None)
last_dt = now
dt = now
for (day, txt) in zip(
- forecast.find_all('b'), forecast.find_all(class_='col-sm-10 forecast-text')
+ forecast.find_all('b'),
+ forecast.find_all(class_='col-sm-10 forecast-text'),
):
last_dt = dt
try:
assert dt is not None
# Compute sunrise/sunset times on dt.
- city = astral.LocationInfo(
- "Bellevue", "USA", "US/Pacific", 47.653, -122.171
- )
+ city = astral.LocationInfo("Bellevue", "USA", "US/Pacific", 47.653, -122.171)
s = sun(city.observer, date=dt, tzinfo=pytz.timezone("US/Pacific"))
sunrise = s['sunrise']
sunset = s['sunset']
for r in range(rows):
for c in range(cols):
pixel = hsv[(r, c)]
- if (
- is_near(pixel[0], 16)
- and is_near(pixel[1], 117)
- and is_near(pixel[2], 196)
- ):
+ if is_near(pixel[0], 16) and is_near(pixel[1], 117) and is_near(pixel[2], 196):
weird_orange_count += 1
elif is_near(pixel[0], 0) and is_near(pixel[1], 0):
hs_zero_count += 1
"""Fetch the raw webcam image from the video server."""
camera_name = camera_name.replace(".house", "")
camera_name = camera_name.replace(".cabin", "")
- url = f"http://10.0.0.226:8080/Umtxxf1uKMBniFblqeQ9KRbb6DDzN4/jpeg/GKlT2FfiSQ/{camera_name}/s.jpg"
+ url = (
+ f"http://10.0.0.226:8080/Umtxxf1uKMBniFblqeQ9KRbb6DDzN4/jpeg/GKlT2FfiSQ/{camera_name}/s.jpg"
+ )
logger.debug(f'Fetching image from {url}')
try:
response = requests.get(url, stream=False, timeout=10.0)
@decorator_utils.retry_if_none(tries=2, delay_sec=1, backoff=1.1)
-def fetch_camera_image_from_rtsp_stream(
- camera_name: str, *, width: int = 256
-) -> Optional[bytes]:
+def fetch_camera_image_from_rtsp_stream(camera_name: str, *, width: int = 256) -> Optional[bytes]:
"""Fetch the raw webcam image straight from the webcam's RTSP stream."""
hostname = camera_name_to_hostname(camera_name)
stream = f"rtsp://camera:IaLaIok@{hostname}:554/live"
f"scale={width}:-1",
"-",
]
- with subprocess.Popen(
- cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL
- ) as proc:
+ with subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) as proc:
out, _ = proc.communicate(timeout=10)
return out
except Exception as e:
@decorator_utils.timeout(seconds=30, use_signals=False)
-def _fetch_camera_image(
- camera_name: str, *, width: int = 256, quality: int = 70
-) -> RawJpgHsv:
+def _fetch_camera_image(camera_name: str, *, width: int = 256, quality: int = 70) -> RawJpgHsv:
"""Fetch a webcam image given the camera name."""
logger.debug("Trying to fetch camera image from video server")
- raw = fetch_camera_image_from_video_server(
- camera_name, width=width, quality=quality
- )
+ raw = fetch_camera_image_from_video_server(camera_name, width=width, quality=quality)
if raw is None:
logger.debug("Reading from video server failed; trying direct RTSP stream")
raw = fetch_camera_image_from_rtsp_stream(camera_name, width=width)
return RawJpgHsv(None, None, None)
-def fetch_camera_image(
- camera_name: str, *, width: int = 256, quality: int = 70
-) -> RawJpgHsv:
+def fetch_camera_image(camera_name: str, *, width: int = 256, quality: int = 70) -> RawJpgHsv:
try:
return _fetch_camera_image(camera_name, width=width, quality=quality)
except exceptions.TimeoutError:
#!/usr/bin/env python3
+
class BiDict(dict):
def __init__(self, *args, **kwargs):
"""
if __name__ == '__main__':
import doctest
+
doctest.testmod()
return self._find(value, node.right)
return None
- def _parent_path(
- self, current: Optional[Node], target: Node
- ) -> List[Optional[Node]]:
+ def _parent_path(self, current: Optional[Node], target: Node) -> List[Optional[Node]]:
if current is None:
return [None]
ret: List[Optional[Node]] = [current]
return self.depth()
def repr_traverse(
- self, padding: str, pointer: str, node: Optional[Node], has_right_sibling: bool
+ self,
+ padding: str,
+ pointer: str,
+ node: Optional[Node],
+ has_right_sibling: bool,
) -> str:
if node is not None:
viz = f'\n{padding}{pointer}{node.value}'
else:
pointer_left = "└──"
- viz += self.repr_traverse(
- padding, pointer_left, node.left, node.right is not None
- )
+ viz += self.repr_traverse(padding, pointer_left, node.left, node.right is not None)
viz += self.repr_traverse(padding, pointer_right, node.right, False)
return viz
return ""
else:
pointer_left = "├──"
- ret += self.repr_traverse(
- '', pointer_left, self.root.left, self.root.left is not None
- )
+ ret += self.repr_traverse('', pointer_left, self.root.left, self.root.left is not None)
ret += self.repr_traverse('', pointer_right, self.root.right, False)
return ret
from contextlib import contextmanager
from functools import wraps
from multiprocessing import RLock, shared_memory
-from typing import (
- Any,
- Dict,
- Generator,
- ItemsView,
- Iterator,
- KeysView,
- Optional,
- ValuesView,
-)
+from typing import Any, Dict, Generator, ItemsView, Iterator, KeysView, Optional, ValuesView
from decorator_utils import synchronized
return shared_memory.SharedMemory(name=name, create=True, size=size_bytes)
def _ensure_memory_initialization(self):
- memory_is_empty = (
- bytes(self.shared_memory.buf).split(SharedDict.NULL_BYTE, 1)[0] == b''
- )
+ memory_is_empty = bytes(self.shared_memory.buf).split(SharedDict.NULL_BYTE, 1)[0] == b''
if memory_is_empty:
self.clear()
for examples.
"""
+
def __init__(self):
self.root = {}
self.end = "~END~"
return None
return [x for x in node if x != self.end]
- def repr_fancy(self, padding: str, pointer: str, parent: str, node: Any, has_sibling: bool):
+ def repr_fancy(
+ self,
+ padding: str,
+ pointer: str,
+ parent: str,
+ node: Any,
+ has_sibling: bool,
+ ):
if node is None:
return
if node is not self.root:
if __name__ == '__main__':
import doctest
+
doctest.testmod()
if env in os.environ:
if not is_flag_already_in_argv(var):
value = os.environ[env]
- saved_messages.append(
- f'Initialized from environment: {var} = {value}'
- )
+ saved_messages.append(f'Initialized from environment: {var} = {value}')
from string_utils import to_bool
if len(chunks) == 1 and to_bool(value):
raise Exception(
f'Encountered unrecognized config argument(s) {unknown} with --config_rejects_unrecognized_arguments enabled; halting.'
)
- saved_messages.append(
- f'Config encountered unrecognized commandline arguments: {unknown}'
- )
+ saved_messages.append(f'Config encountered unrecognized commandline arguments: {unknown}')
sys.argv = sys.argv[:1] + unknown
# Check for savefile and populate it if requested.
return _convert(magnitude, src, dst)
-def _convert(
- magnitude: SupportsFloat, from_unit: Converter, to_unit: Converter
-) -> float:
+def _convert(magnitude: SupportsFloat, from_unit: Converter, to_unit: Converter) -> float:
canonical = from_unit.to_canonical(magnitude)
converted = to_unit.from_canonical(canonical)
return float(converted)
from dateparse.dateparse_utilsLexer import dateparse_utilsLexer # type: ignore
from dateparse.dateparse_utilsListener import dateparse_utilsListener # type: ignore
from dateparse.dateparse_utilsParser import dateparse_utilsParser # type: ignore
-from datetime_utils import (
- TimeUnit,
- date_to_datetime,
- datetime_to_date,
- n_timeunits_from_base,
-)
+from datetime_utils import TimeUnit, date_to_datetime, datetime_to_date, n_timeunits_from_base
logger = logging.getLogger(__name__)
def syntaxError(self, recognizer, offendingSymbol, line, column, msg, e):
raise ParseException(msg)
- def reportAmbiguity(
- self, recognizer, dfa, startIndex, stopIndex, exact, ambigAlts, configs
- ):
+ def reportAmbiguity(self, recognizer, dfa, startIndex, stopIndex, exact, ambigAlts, configs):
pass
def reportAttemptingFullContext(
):
pass
- def reportContextSensitivity(
- self, recognizer, dfa, startIndex, stopIndex, prediction, configs
- ):
+ def reportContextSensitivity(self, recognizer, dfa, startIndex, stopIndex, prediction, configs):
pass
micros = self.time.microsecond
self.datetime = datetime.datetime(
- year, month, day, hour, minute, second, micros, tzinfo=self.time.tzinfo
+ year,
+ month,
+ day,
+ hour,
+ minute,
+ second,
+ micros,
+ tzinfo=self.time.tzinfo,
)
# Apply resudual adjustments to times here when we have a
else:
raise ParseException(f'Invalid Unit: "{unit}"')
- def exitDeltaPlusMinusExpr(
- self, ctx: dateparse_utilsParser.DeltaPlusMinusExprContext
- ) -> None:
+ def exitDeltaPlusMinusExpr(self, ctx: dateparse_utilsParser.DeltaPlusMinusExprContext) -> None:
try:
n = ctx.nth()
if n is None:
else:
self.context['delta_unit'] = unit
- def exitDeltaNextLast(
- self, ctx: dateparse_utilsParser.DeltaNextLastContext
- ) -> None:
+ def exitDeltaNextLast(self, ctx: dateparse_utilsParser.DeltaNextLastContext) -> None:
try:
txt = ctx.getText().lower()
except Exception:
raise ParseException(f'Bad next/last: {ctx.getText()}')
if 'month' in self.context or 'day' in self.context or 'year' in self.context:
- raise ParseException(
- 'Next/last expression expected to be relative to today.'
- )
+ raise ParseException('Next/last expression expected to be relative to today.')
if txt[:4] == 'next':
self.context['delta_int'] = +1
self.context['day'] = self.now_datetime.day
if 'time_delta_before_after' not in self.context:
raise ParseException(f'Bad Before/After: {ctx.getText()}')
- def exitDeltaTimeFraction(
- self, ctx: dateparse_utilsParser.DeltaTimeFractionContext
- ) -> None:
+ def exitDeltaTimeFraction(self, ctx: dateparse_utilsParser.DeltaTimeFractionContext) -> None:
try:
txt = ctx.getText().lower()[:4]
if txt == 'quar':
except Exception:
raise ParseException(f'Bad time fraction {ctx.getText()}')
- def exitDeltaBeforeAfter(
- self, ctx: dateparse_utilsParser.DeltaBeforeAfterContext
- ) -> None:
+ def exitDeltaBeforeAfter(self, ctx: dateparse_utilsParser.DeltaBeforeAfterContext) -> None:
try:
txt = ctx.getText().lower()
except Exception:
else:
self.context['delta_before_after'] = txt
- def exitDeltaTimeBeforeAfter(
- self, ctx: dateparse_utilsParser.DeltaBeforeAfterContext
- ) -> None:
+ def exitDeltaTimeBeforeAfter(self, ctx: dateparse_utilsParser.DeltaBeforeAfterContext) -> None:
try:
txt = ctx.getText().lower()
except Exception:
special = ctx.specialDate().getText().lower()
self.context['special'] = special
except Exception:
- raise ParseException(
- f'Bad specialDate expression: {ctx.specialDate().getText()}'
- )
+ raise ParseException(f'Bad specialDate expression: {ctx.specialDate().getText()}')
try:
mod = ctx.thisNextLast()
if mod is not None:
self.context['month'] = d.month
self.context['day'] = d.day
- def exitSpecialTimeExpr(
- self, ctx: dateparse_utilsParser.SpecialTimeExprContext
- ) -> None:
+ def exitSpecialTimeExpr(self, ctx: dateparse_utilsParser.SpecialTimeExprContext) -> None:
try:
txt = ctx.specialTime().getText().lower()
except Exception:
except Exception:
pass
- def exitTwelveHourTimeExpr(
- self, ctx: dateparse_utilsParser.TwelveHourTimeExprContext
- ) -> None:
+ def exitTwelveHourTimeExpr(self, ctx: dateparse_utilsParser.TwelveHourTimeExprContext) -> None:
try:
hour = ctx.hour().getText()
while not hour[-1].isdigit():
return datetime.datetime.combine(now, time, tz)
-def date_and_time_to_datetime(
- date: datetime.date, time: datetime.time
-) -> datetime.datetime:
+def date_and_time_to_datetime(date: datetime.date, time: datetime.time) -> datetime.datetime:
"""
Given a date and time, merge them and return a datetime.
return False
-def n_timeunits_from_base(
- count: int, unit: TimeUnit, base: datetime.datetime
-) -> datetime.datetime:
+def n_timeunits_from_base(count: int, unit: TimeUnit, base: datetime.datetime) -> datetime.datetime:
"""Return a datetime that is N units before/after a base datetime.
e.g. 3 Wednesdays from base datetime, 2 weeks from base date, 10
years before base datetime, 13 minutes after base datetime, etc...
base += timedelta
if base.year != old_year:
skips = holidays.US(years=base.year).keys()
- if (
- base.weekday() < 5
- and datetime.date(base.year, base.month, base.day) not in skips
- ):
+ if base.weekday() < 5 and datetime.date(base.year, base.month, base.day) not in skips:
count -= 1
return base
'1d 10m'
"""
- return describe_duration_briefly(
- int(delta.total_seconds())
- ) # Note: drops milliseconds
+ return describe_duration_briefly(int(delta.total_seconds())) # Note: drops milliseconds
if __name__ == '__main__':
logger.debug(f'@{time.time()}> calling it...')
ret = func(*args, **kargs)
last_invocation_timestamp[0] = time.time()
- logger.debug(
- f'@{time.time()}> Last invocation <- {last_invocation_timestamp[0]}'
- )
+ logger.debug(f'@{time.time()}> Last invocation <- {last_invocation_timestamp[0]}')
cv.notify()
return ret
def __call__(self, *args, **kwargs):
"""Returns a single instance of decorated class"""
- logger.debug(
- f"@singleton returning global instance of {self.__wrapped__.__name__}"
- )
+ logger.debug(f"@singleton returning global instance of {self.__wrapped__.__name__}")
if self._instance is None:
self._instance = self.__wrapped__(*args, **kwargs)
return self._instance
self.__limit = kwargs.pop("timeout", self.__limit)
self.__queue = multiprocessing.Queue(1)
args = (self.__queue, self.__function) + args
- self.__process = multiprocessing.Process(
- target=_target, args=args, kwargs=kwargs
- )
+ self.__process = multiprocessing.Process(target=_target, args=args, kwargs=kwargs)
self.__process.daemon = True
self.__process.start()
if self.__limit is not None:
@functools.wraps(function)
def new_function(*args, **kwargs):
- timeout_wrapper = _Timeout(
- function, timeout_exception, error_message, seconds
- )
+ timeout_wrapper = _Timeout(function, timeout_exception, error_message, seconds)
return timeout_wrapper(*args, **kwargs)
return new_function
for arg in args:
newargs.append(arg)
start = time.time()
- result = self._thread_pool_executor.submit(
- self.run_local_bundle, *newargs, **kwargs
- )
+ result = self._thread_pool_executor.submit(self.run_local_bundle, *newargs, **kwargs)
result.add_done_callback(lambda _: self.histogram.add_item(time.time() - start))
result.add_done_callback(lambda _: self.adjust_task_count(-1))
return result
self.start_time: float = time.time()
self.start_per_bundle: Dict[str, Optional[float]] = defaultdict(float)
self.end_per_bundle: Dict[str, float] = defaultdict(float)
- self.finished_bundle_timings_per_worker: Dict[
- RemoteWorkerRecord, List[float]
- ] = {}
+ self.finished_bundle_timings_per_worker: Dict[RemoteWorkerRecord, List[float]] = {}
self.in_flight_bundles_by_worker: Dict[RemoteWorkerRecord, Set[str]] = {}
self.bundle_details_by_uuid: Dict[str, BundleDetails] = {}
self.finished_bundle_timings: List[float] = []
with self.lock:
self.record_acquire_worker_already_locked(worker, uuid)
- def record_acquire_worker_already_locked(
- self, worker: RemoteWorkerRecord, uuid: str
- ) -> None:
+ def record_acquire_worker_already_locked(self, worker: RemoteWorkerRecord, uuid: str) -> None:
assert self.lock.locked()
self.known_workers.add(worker)
self.start_per_bundle[uuid] = None
grabbag.append(worker)
if len(grabbag) == 0:
- logger.debug(
- f'There are no available workers that avoid {machine_to_avoid}...'
- )
+ logger.debug(f'There are no available workers that avoid {machine_to_avoid}...')
for worker in self.workers:
if worker.count > 0:
for _ in range(worker.count * worker.weight):
return False
@overrides
- def acquire_worker(
- self, machine_to_avoid: str = None
- ) -> Optional[RemoteWorkerRecord]:
+ def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
x = self.index
while True:
worker = self.workers[x]
raise RemoteExecutorException(msg)
self.policy.register_worker_pool(self.workers)
self.cv = threading.Condition()
- logger.debug(
- f'Creating {self.worker_count} local threads, one per remote worker.'
- )
+ logger.debug(f'Creating {self.worker_count} local threads, one per remote worker.')
self._helper_executor = fut.ThreadPoolExecutor(
thread_name_prefix="remote_executor_helper",
max_workers=self.worker_count,
if start_ts is not None:
runtime = now - start_ts
score += runtime
- logger.debug(
- f'score[{bundle}] => {score} # latency boost'
- )
+ logger.debug(f'score[{bundle}] => {score} # latency boost')
if bundle.slower_than_local_p95:
score += runtime / 2
- logger.debug(
- f'score[{bundle}] => {score} # >worker p95'
- )
+ logger.debug(f'score[{bundle}] => {score} # >worker p95')
if bundle.slower_than_global_p95:
score += runtime / 4
- logger.debug(
- f'score[{bundle}] => {score} # >global p95'
- )
+ logger.debug(f'score[{bundle}] => {score} # >global p95')
# Prefer backups of bundles that don't
# have backups already.
f'score[{bundle}] => {score} # {backup_count} dup backup factor'
)
- if score != 0 and (
- best_score is None or score > best_score
- ):
+ if score != 0 and (best_score is None or score > best_score):
bundle_to_backup = bundle
assert bundle is not None
assert bundle.backup_bundles is not None
def is_worker_available(self) -> bool:
return self.policy.is_worker_available()
- def acquire_worker(
- self, machine_to_avoid: str = None
- ) -> Optional[RemoteWorkerRecord]:
+ def acquire_worker(self, machine_to_avoid: str = None) -> Optional[RemoteWorkerRecord]:
return self.policy.acquire_worker(machine_to_avoid)
- def find_available_worker_or_block(
- self, machine_to_avoid: str = None
- ) -> RemoteWorkerRecord:
+ def find_available_worker_or_block(self, machine_to_avoid: str = None) -> RemoteWorkerRecord:
with self.cv:
while not self.is_worker_available():
self.cv.wait()
try:
return self.process_work_result(bundle)
except Exception as e:
- logger.warning(
- f'{bundle}: bundle says it\'s cancelled upfront but no results?!'
- )
+ logger.warning(f'{bundle}: bundle says it\'s cancelled upfront but no results?!')
self.release_worker(bundle)
if is_original:
# Weird. We are the original owner of this
# Send input code / data to worker machine if it's not local.
if hostname not in machine:
try:
- cmd = (
- f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
- )
+ cmd = f'{SCP} {bundle.code_file} {username}@{machine}:{bundle.code_file}'
start_ts = time.time()
logger.info(f"{bundle}: Copying work to {worker} via {cmd}.")
run_silently(cmd)
logger.debug(f'{bundle}: Executing {cmd} in the background to kick off work...')
p = cmd_in_background(cmd, silent=True)
bundle.pid = p.pid
- logger.debug(
- f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.'
- )
+ logger.debug(f'{bundle}: Local ssh process pid={p.pid}; remote worker is {machine}.')
return self.wait_for_process(p, bundle, 0)
def wait_for_process(
p.wait(timeout=0.25)
except subprocess.TimeoutExpired:
if self.check_if_cancelled(bundle):
- logger.info(
- f'{bundle}: looks like another worker finished bundle...'
- )
+ logger.info(f'{bundle}: looks like another worker finished bundle...')
break
else:
logger.info(f"{bundle}: pid {pid} ({machine}) is finished!")
break
run_silently(
- f'{SSH} {username}@{machine}'
- f' "/bin/rm -f {code_file} {result_file}"'
- )
- logger.debug(
- f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.'
+ f'{SSH} {username}@{machine}' f' "/bin/rm -f {code_file} {result_file}"'
)
+ logger.debug(f'Fetching results back took {time.time() - bundle.end_ts:.1f}s.')
dur = bundle.end_ts - bundle.start_ts
self.histogram.add_item(dur)
# backup.
if bundle.backup_bundles is not None:
for backup in bundle.backup_bundles:
- logger.debug(
- f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled'
- )
+ logger.debug(f'{bundle}: Notifying backup {backup.uuid} that it\'s cancelled')
backup.is_cancelled.set()
# This is a backup job and, by now, we have already fetched
if not was_cancelled:
orig_bundle = bundle.src_bundle
assert orig_bundle is not None
- logger.debug(
- f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.'
- )
+ logger.debug(f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.')
orig_bundle.is_cancelled.set()
self.release_worker(bundle, was_cancelled=was_cancelled)
return result
# they will move the result_file to this machine and let
# the original pick them up and unpickle them.
- def emergency_retry_nasty_bundle(
- self, bundle: BundleDetails
- ) -> Optional[fut.Future]:
+ def emergency_retry_nasty_bundle(self, bundle: BundleDetails) -> Optional[fut.Future]:
is_original = bundle.src_bundle is None
bundle.worker = None
avoid_last_machine = bundle.machine
def ping(self, host) -> bool:
logger.debug(f'RUN> ping -c 1 {host}')
try:
- x = cmd_with_timeout(
- f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0
- )
+ x = cmd_with_timeout(f'ping -c 1 {host} >/dev/null 2>/dev/null', timeout_seconds=1.0)
return x == 0
except Exception:
return False
os.utime(filename, None)
-def convert_file_timestamp_to_datetime(
- filename: str, producer
-) -> Optional[datetime.datetime]:
+def convert_file_timestamp_to_datetime(filename: str, producer) -> Optional[datetime.datetime]:
ts = producer(filename)
if ts is not None:
return datetime.datetime.fromtimestamp(ts)
return get_file_timestamp_age_seconds(filename, lambda x: x.st_mtime)
-def get_file_timestamp_timedelta(
- filename: str, extractor
-) -> Optional[datetime.timedelta]:
+def get_file_timestamp_timedelta(filename: str, extractor) -> Optional[datetime.timedelta]:
age = get_file_timestamp_age_seconds(filename, extractor)
if age is not None:
return datetime.timedelta(seconds=float(age))
if self.locktime:
ts = datetime.datetime.now().timestamp()
duration = ts - self.locktime
- if (
- duration
- >= config.config['lockfile_held_duration_warning_threshold_sec']
- ):
+ if duration >= config.config['lockfile_held_duration_warning_threshold_sec']:
str_duration = datetime_utils.describe_duration_briefly(duration)
msg = f'Held {self.lockfile} for {str_duration}'
logger.warning(msg)
per_scope_logging_levels: str,
) -> None:
super().__init__()
- self.valid_levels = set(
- ['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL']
- )
+ self.valid_levels = set(['NOTSET', 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'])
self.default_logging_level = default_logging_level
self.level_by_scope = {}
if per_scope_logging_levels is not None:
file=sys.stderr,
)
continue
- self.level_by_scope[
- scope
- ] = DynamicPerScopeLoggingLevelFilter.level_name_to_level(level)
+ self.level_by_scope[scope] = DynamicPerScopeLoggingLevelFilter.level_name_to_level(
+ level
+ )
@overrides
def filter(self, record: logging.LogRecord) -> bool:
@overrides
def formatTime(self, record, datefmt=None):
- ct = MillisecondAwareFormatter.converter(
- record.created, pytz.timezone("US/Pacific")
- )
+ ct = MillisecondAwareFormatter.converter(record.created, pytz.timezone("US/Pacific"))
if datefmt:
s = ct.strftime(datefmt)
else:
def log_about_logging(
- logger, default_logging_level, preexisting_handlers_count, fmt, facility_name
+ logger,
+ default_logging_level,
+ preexisting_handlers_count,
+ fmt,
+ facility_name,
):
- level_name = logging._levelToName.get(
- default_logging_level, str(default_logging_level)
- )
+ level_name = logging._levelToName.get(default_logging_level, str(default_logging_level))
logger.debug(f'Initialized global logging; default logging level is {level_name}.')
- if (
- config.config['logging_clear_preexisting_handlers']
- and preexisting_handlers_count > 0
- ):
+ if config.config['logging_clear_preexisting_handlers'] and preexisting_handlers_count > 0:
msg = f'Logging cleared {preexisting_handlers_count} global handlers (--logging_clear_preexisting_handlers)'
logger.warning(msg)
logger.debug(f'Logging format specification is "{fmt}"')
if config.config['logging_debug_threads']:
- logger.debug(
- '...Logging format spec captures tid/pid (--logging_debug_threads)'
- )
+ logger.debug('...Logging format spec captures tid/pid (--logging_debug_threads)')
if config.config['logging_debug_modules']:
logger.debug(
'...Logging format spec captures files/functions/lineno (--logging_debug_modules)'
)
if config.config['logging_syslog']:
- logger.debug(
- f'Logging to syslog as {facility_name} with priority mapping based on level'
- )
+ logger.debug(f'Logging to syslog as {facility_name} with priority mapping based on level')
if config.config['logging_filename']:
logger.debug(f'Logging to filename {config.config["logging_filename"]}')
- logger.debug(
- f'...with {config.config["logging_filename_maxsize"]} bytes max file size.'
- )
+ logger.debug(f'...with {config.config["logging_filename_maxsize"]} bytes max file size.')
logger.debug(
f'...and {config.config["logging_filename_count"]} rotating backup file count.'
)
handler: Optional[logging.Handler] = None
# Global default logging level (--logging_level)
- default_logging_level = getattr(
- logging, config.config['logging_level'].upper(), None
- )
+ default_logging_level = getattr(logging, config.config['logging_level'].upper(), None)
if not isinstance(default_logging_level, int):
raise ValueError('Invalid level: %s' % config.config['logging_level'])
def invert_docid_set(self, original: Set[str]) -> Set[str]:
"""Invert a set of docids."""
- return set(
- [docid for docid in self.documents_by_docid.keys() if docid not in original]
- )
+ return set([docid for docid in self.documents_by_docid.keys() if docid not in original])
def get_doc(self, docid: str) -> Optional[Document]:
"""Given a docid, retrieve the previously added Document."""
operation = Operation.from_token(token)
operand_count = operation.num_operands()
if len(node_stack) < operand_count:
- raise ParseError(
- f"Incorrect number of operations for {operation}"
- )
+ raise ParseError(f"Incorrect number of operations for {operation}")
for _ in range(operation.num_operands()):
args.append(node_stack.pop())
node = Node(corpus, operation, args)
try:
key, value = tag.split(":")
except ValueError as v:
- raise ParseError(
- f'Invalid key:value syntax at "{tag}"'
- ) from v
+ raise ParseError(f'Invalid key:value syntax at "{tag}"') from v
if value == "*":
r = self.corpus.get_docids_with_property(key)
else:
logger = logging.getLogger(__file__)
parser = config.add_commandline_args(
- f"ML Model Trainer ({__file__})", "Arguments related to training an ML model"
+ f"ML Model Trainer ({__file__})",
+ "Arguments related to training an ML model",
)
parser.add_argument(
"--ml_trainer_quiet",
try:
(key, value) = line.split(self.spec.key_value_delimiter)
except Exception:
- logger.debug(
- f"WARNING: bad line in file {filename} '{line}', skipped"
- )
+ logger.debug(f"WARNING: bad line in file {filename} '{line}', skipped")
continue
key = key.strip()
value = value.strip()
- if (
- self.spec.features_to_skip is not None
- and key in self.spec.features_to_skip
- ):
+ if self.spec.features_to_skip is not None and key in self.spec.features_to_skip:
logger.debug(f"Skipping feature {key}")
continue
# Note: children should implement. Consider using @parallelize.
@abstractmethod
- def train_model(
- self, parameters, X_train_scaled: np.ndarray, y_train: np.ndarray
- ) -> Any:
+ def train_model(self, parameters, X_train_scaled: np.ndarray, y_train: np.ndarray) -> Any:
pass
def evaluate_model(
self.spec.persist_percentage_threshold is not None
and test_score > self.spec.persist_percentage_threshold
) or (
- not self.spec.quiet
- and input_utils.yn_response("Write the model? [y,n]: ") == "y"
+ not self.spec.quiet and input_utils.yn_response("Write the model? [y,n]: ") == "y"
):
scaler_filename = f"{self.spec.basename}_scaler.sav"
with open(scaler_filename, "wb") as fb:
logger.debug(f'Attempting to instantiate {cls.__name__} directly.')
self.instance = cls(*args, **kwargs)
else:
- logger.debug(
- f'Class {cls.__name__} was loaded from persisted state successfully.'
- )
+ logger.debug(f'Class {cls.__name__} was loaded from persisted state successfully.')
was_loaded = True
assert self.instance is not None
if self.persist_at_shutdown is PersistAtShutdown.ALWAYS or (
- not was_loaded
- and self.persist_at_shutdown is PersistAtShutdown.IF_NOT_LOADED
+ not was_loaded and self.persist_at_shutdown is PersistAtShutdown.IF_NOT_LOADED
):
- logger.debug(
- 'Scheduling a deferred called to save at process shutdown time.'
- )
+ logger.debug('Scheduling a deferred called to save at process shutdown time.')
atexit.register(self.instance.save)
return self.instance
@overrides
def turn_on(self) -> bool:
- return GoogleLight.parse_google_response(
- ask_google(f"turn {self.goog_name()} on")
- )
+ return GoogleLight.parse_google_response(ask_google(f"turn {self.goog_name()} on"))
@overrides
def turn_off(self) -> bool:
- return GoogleLight.parse_google_response(
- ask_google(f"turn {self.goog_name()} off")
- )
+ return GoogleLight.parse_google_response(ask_google(f"turn {self.goog_name()} off"))
@overrides
def status(self) -> str:
@overrides
def make_color(self, color: str) -> bool:
- return GoogleLight.parse_google_response(
- ask_google(f"make {self.goog_name()} {color}")
- )
+ return GoogleLight.parse_google_response(ask_google(f"make {self.goog_name()} {color}"))
class TuyaLight(BaseLight):
@overrides
def turn_on(self) -> bool:
- return GoogleOutlet.parse_google_response(
- ask_google(f'turn {self.goog_name()} on')
- )
+ return GoogleOutlet.parse_google_response(ask_google(f'turn {self.goog_name()} on'))
@overrides
def turn_off(self) -> bool:
- return GoogleOutlet.parse_google_response(
- ask_google(f'turn {self.goog_name()} off')
- )
+ return GoogleOutlet.parse_google_response(ask_google(f'turn {self.goog_name()} off'))
@overrides
def is_on(self) -> bool:
def __init__(self):
self.loop = asyncio.get_event_loop()
self.email = os.environ.get('MEROSS_EMAIL') or scott_secrets.MEROSS_EMAIL
- self.password = (
- os.environ.get('MEROSS_PASSWORD') or scott_secrets.MEROSS_PASSWORD
- )
+ self.password = os.environ.get('MEROSS_PASSWORD') or scott_secrets.MEROSS_PASSWORD
self.devices = self.loop.run_until_complete(self.find_meross_devices())
atexit.register(self.loop.close)
#!/usr/bin/env python3
import logging
-from typing import Optional
import urllib.request
+from typing import Optional
logger = logging.getLogger()
'cabin_hottub': ('192.168.0.107', 'hottub_temp'),
}
- def read_temperature(
- self, location: str, *, convert_to_fahrenheit=False
- ) -> Optional[float]:
+ def read_temperature(self, location: str, *, convert_to_fahrenheit=False) -> Optional[float]:
record = self.thermometers.get(location, None)
if record is None:
logger.error(
temp = www.read().decode('utf-8')
temp = float(temp)
if convert_to_fahrenheit:
- temp *= (9/5)
+ temp *= 9 / 5
temp += 32.0
temp = round(temp)
except Exception as e:
override_sleep_delay: Optional[float] = None,
) -> None:
self._something_changed = threading.Event()
- super().__init__(
- update_ids_to_update_secs, override_sleep_delay=override_sleep_delay
- )
+ super().__init__(update_ids_to_update_secs, override_sleep_delay=override_sleep_delay)
def something_changed(self):
self._something_changed.set()
ESCAPED_AT_SIGN = re.compile(r'(?!"[^"]*)@+(?=[^"]*")|\\@')
-EMAILS_RAW_STRING = (
- r"[a-zA-Z\d._\+\-'`!%#$&*/=\?\^\{\}\|~\\]+@[a-z\d-]+\.?[a-z\d-]+\.[a-z]{2,4}"
-)
+EMAILS_RAW_STRING = r"[a-zA-Z\d._\+\-'`!%#$&*/=\?\^\{\}\|~\\]+@[a-z\d-]+\.?[a-z\d-]+\.[a-z]{2,4}"
EMAIL_RE = re.compile(r"^{}$".format(EMAILS_RAW_STRING))
CAMEL_CASE_REPLACE_RE = re.compile(r"([a-z]|[A-Z]+)(?=[A-Z])")
-SNAKE_CASE_TEST_RE = re.compile(
- r"^([a-z]+\d*_[a-z\d_]*|_+[a-z\d]+[a-z\d_]*)$", re.IGNORECASE
-)
+SNAKE_CASE_TEST_RE = re.compile(r"^([a-z]+\d*_[a-z\d_]*|_+[a-z\d]+[a-z\d_]*)$", re.IGNORECASE)
-SNAKE_CASE_TEST_DASH_RE = re.compile(
- r"([a-z]+\d*-[a-z\d-]*|-+[a-z\d]+[a-z\d-]*)$", re.IGNORECASE
-)
+SNAKE_CASE_TEST_DASH_RE = re.compile(r"([a-z]+\d*-[a-z\d-]*|-+[a-z\d]+[a-z\d-]*)$", re.IGNORECASE)
SNAKE_CASE_REPLACE_RE = re.compile(r"(_)([a-z\d])")
JSON_WRAPPER_RE = re.compile(r"^\s*[\[{]\s*(.*)\s*[\}\]]\s*$", re.MULTILINE | re.DOTALL)
-UUID_RE = re.compile(
- r"^[a-f\d]{8}-[a-f\d]{4}-[a-f\d]{4}-[a-f\d]{4}-[a-f\d]{12}$", re.IGNORECASE
-)
+UUID_RE = re.compile(r"^[a-f\d]{8}-[a-f\d]{4}-[a-f\d]{4}-[a-f\d]{4}-[a-f\d]{12}$", re.IGNORECASE)
UUID_HEX_OK_RE = re.compile(
r"^[a-f\d]{8}-?[a-f\d]{4}-?[a-f\d]{4}-?[a-f\d]{4}-?[a-f\d]{12}$",
MAC_ADDRESS_RE = re.compile(r"^([0-9A-F]{2}[:-]){5}([0-9A-F]{2})$", re.IGNORECASE)
-ANYWHERE_MAC_ADDRESS_RE = re.compile(
- r"([0-9A-F]{2}[:-]){5}([0-9A-F]{2})", re.IGNORECASE
-)
+ANYWHERE_MAC_ADDRESS_RE = re.compile(r"([0-9A-F]{2}[:-]){5}([0-9A-F]{2})", re.IGNORECASE)
WORDS_COUNT_RE = re.compile(r"\W*[^\W_]+\W*", re.IGNORECASE | re.MULTILINE | re.UNICODE)
if isinstance(in_str, numbers.Number):
in_str = f'{in_str}'
if is_number(in_str):
- return _add_thousands_separator(
- in_str, separator_char=separator_char, places=places
- )
+ return _add_thousands_separator(in_str, separator_char=separator_char, places=places)
raise ValueError(in_str)
# Special cases: can't, shan't and won't.
txt = re.sub(r'\b(can)\s*no(t)\b', r"\1'\2", txt, count=0, flags=re.IGNORECASE)
+ txt = re.sub(r'\b(sha)ll\s*(n)o(t)\b', r"\1\2'\3", txt, count=0, flags=re.IGNORECASE)
txt = re.sub(
- r'\b(sha)ll\s*(n)o(t)\b', r"\1\2'\3", txt, count=0, flags=re.IGNORECASE
- )
- txt = re.sub(
- r'\b(w)ill\s*(n)(o)(t)\b', r"\1\3\2'\4", txt, count=0, flags=re.IGNORECASE
+ r'\b(w)ill\s*(n)(o)(t)\b',
+ r"\1\3\2'\4",
+ txt,
+ count=0,
+ flags=re.IGNORECASE,
)
for first_list, second_list in first_second:
yield txt[x : x + chunk_size]
-def to_bitstring(
- txt: str, *, delimiter='', encoding='utf-8', errors='surrogatepass'
-) -> str:
+def to_bitstring(txt: str, *, delimiter='', encoding='utf-8', errors='surrogatepass') -> str:
"""Encode txt and then chop it into bytes. Note: only bitstrings
with delimiter='' are interpretable by from_bitstring.
import unittest
-from type.centcount import CentCount
import unittest_utils as uu
+from type.centcount import CentCount
class TestCentCount(unittest.TestCase):
-
def test_basic_utility(self):
amount = CentCount(1.45)
another = CentCount.parse("USD 1.45")
import dateparse.dateparse_utils as du
import unittest_utils as uu
-
parsable_expressions = [
- ('today',
- datetime.datetime(2021, 7, 2)),
- ('tomorrow',
- datetime.datetime(2021, 7, 3)),
- ('yesterday',
- datetime.datetime(2021, 7, 1)),
- ('21:30',
- datetime.datetime(2021, 7, 2, 21, 30, 0, 0)),
- ('12:01am',
- datetime.datetime(2021, 7, 2, 0, 1, 0, 0)),
- ('12:02p',
- datetime.datetime(2021, 7, 2, 12, 2, 0, 0)),
- ('0:03',
- datetime.datetime(2021, 7, 2, 0, 3, 0, 0)),
- ('last wednesday',
- datetime.datetime(2021, 6, 30)),
- ('this wed',
- datetime.datetime(2021, 7, 7)),
- ('next wed',
- datetime.datetime(2021, 7, 14)),
- ('this coming tues',
- datetime.datetime(2021, 7, 6)),
- ('this past monday',
- datetime.datetime(2021, 6, 28)),
- ('4 days ago',
- datetime.datetime(2021, 6, 28)),
- ('4 mondays ago',
- datetime.datetime(2021, 6, 7)),
- ('4 months ago',
- datetime.datetime(2021, 3, 2)),
- ('3 days back',
- datetime.datetime(2021, 6, 29)),
- ('13 weeks from now',
- datetime.datetime(2021, 10, 1)),
- ('1 year from now',
- datetime.datetime(2022, 7, 2)),
- ('4 weeks from now',
- datetime.datetime(2021, 7, 30)),
- ('3 saturdays ago',
- datetime.datetime(2021, 6, 12)),
- ('4 months from today',
- datetime.datetime(2021, 11, 2)),
- ('4 years from yesterday',
- datetime.datetime(2025, 7, 1)),
- ('4 weeks from tomorrow',
- datetime.datetime(2021, 7, 31)),
- ('april 15, 2005',
- datetime.datetime(2005, 4, 15)),
- ('april 14',
- datetime.datetime(2021, 4, 14)),
- ('9:30am on last wednesday',
- datetime.datetime(2021, 6, 30, 9, 30)),
- ('2005/apr/15',
- datetime.datetime(2005, 4, 15)),
- ('2005 apr 15',
- datetime.datetime(2005, 4, 15)),
- ('the 1st wednesday in may',
- datetime.datetime(2021, 5, 5)),
- ('last sun of june',
- datetime.datetime(2021, 6, 27)),
- ('this Easter',
- datetime.datetime(2021, 4, 4)),
- ('last christmas',
- datetime.datetime(2020, 12, 25)),
- ('last Xmas',
- datetime.datetime(2020, 12, 25)),
- ('xmas, 1999',
- datetime.datetime(1999, 12, 25)),
- ('next mlk day',
- datetime.datetime(2022, 1, 17)),
- ('Halloween, 2020',
- datetime.datetime(2020, 10, 31)),
- ('5 work days after independence day',
- datetime.datetime(2021, 7, 12)),
- ('50 working days from last wed',
- datetime.datetime(2021, 9, 10)),
- ('25 working days before columbus day',
- datetime.datetime(2021, 9, 3)),
- ('today +1 week',
- datetime.datetime(2021, 7, 9)),
- ('sunday -3 weeks',
- datetime.datetime(2021, 6, 13)),
- ('4 weeks before xmas, 1999',
- datetime.datetime(1999, 11, 27)),
- ('3 days before new years eve, 2000',
- datetime.datetime(2000, 12, 28)),
- ('july 4th',
- datetime.datetime(2021, 7, 4)),
- ('the ides of march',
- datetime.datetime(2021, 3, 15)),
- ('the nones of april',
- datetime.datetime(2021, 4, 5)),
- ('the kalends of may',
- datetime.datetime(2021, 5, 1)),
- ('9/11/2001',
- datetime.datetime(2001, 9, 11)),
- ('4 sundays before veterans\' day',
- datetime.datetime(2021, 10, 17)),
- ('xmas eve',
- datetime.datetime(2021, 12, 24)),
- ('this friday at 5pm',
- datetime.datetime(2021, 7, 9, 17, 0, 0)),
- ('presidents day',
- datetime.datetime(2021, 2, 15)),
- ('memorial day, 1921',
- datetime.datetime(1921, 5, 30)),
- ('today -4 wednesdays',
- datetime.datetime(2021, 6, 9)),
- ('thanksgiving',
- datetime.datetime(2021, 11, 25)),
- ('2 sun in jun',
- datetime.datetime(2021, 6, 13)),
- ('easter -40 days',
- datetime.datetime(2021, 2, 23)),
- ('easter +39 days',
- datetime.datetime(2021, 5, 13)),
- ('1st tuesday in nov, 2024',
- datetime.datetime(2024, 11, 5)),
- ('2 days before last xmas at 3:14:15.92a',
- datetime.datetime(2020, 12, 23, 3, 14, 15, 92)),
- ('3 weeks after xmas, 1995 at midday',
- datetime.datetime(1996, 1, 15, 12, 0, 0)),
- ('4 months before easter, 1992 at midnight',
- datetime.datetime(1991, 12, 19)),
- ('5 months before halloween, 1995 at noon',
- datetime.datetime(1995, 5, 31, 12)),
- ('4 days before last wednesday',
- datetime.datetime(2021, 6, 26)),
- ('44 months after today',
- datetime.datetime(2025, 3, 2)),
- ('44 years before today',
- datetime.datetime(1977, 7, 2)),
- ('44 weeks ago',
- datetime.datetime(2020, 8, 28)),
- ('15 minutes to 3am',
- datetime.datetime(2021, 7, 2, 2, 45)),
- ('quarter past 4pm',
- datetime.datetime(2021, 7, 2, 16, 15)),
- ('half past 9',
- datetime.datetime(2021, 7, 2, 9, 30)),
- ('4 seconds to midnight',
- datetime.datetime(2021, 7, 1, 23, 59, 56)),
- ('4 seconds to midnight, tomorrow',
- datetime.datetime(2021, 7, 2, 23, 59, 56)),
- ('2021/apr/15T21:30:44.55',
- datetime.datetime(2021, 4, 15, 21, 30, 44, 55)),
- ('2021/apr/15 at 21:30:44.55',
- datetime.datetime(2021, 4, 15, 21, 30, 44, 55)),
- ('2021/4/15 at 21:30:44.55',
- datetime.datetime(2021, 4, 15, 21, 30, 44, 55)),
- ('2021/04/15 at 21:30:44.55',
- datetime.datetime(2021, 4, 15, 21, 30, 44, 55)),
- ('2021/04/15 at 21:30:44.55Z',
- datetime.datetime(2021, 4, 15, 21, 30, 44, 55,
- tzinfo=pytz.timezone('UTC'))),
- ('2021/04/15 at 21:30:44.55EST',
- datetime.datetime(2021, 4, 15, 21, 30, 44, 55,
- tzinfo=pytz.timezone('EST'))),
- ('13 days after last memorial day at 12 seconds before 4pm',
- datetime.datetime(2020, 6, 7, 15, 59, 48)),
- (' 2 days before yesterday at 9am ',
- datetime.datetime(2021, 6, 29, 9)),
- ('-3 days before today',
- datetime.datetime(2021, 7, 5)),
- ('3 days before yesterday at midnight EST',
- datetime.datetime(2021, 6, 28, tzinfo=pytz.timezone('EST')))
+ ('today', datetime.datetime(2021, 7, 2)),
+ ('tomorrow', datetime.datetime(2021, 7, 3)),
+ ('yesterday', datetime.datetime(2021, 7, 1)),
+ ('21:30', datetime.datetime(2021, 7, 2, 21, 30, 0, 0)),
+ ('12:01am', datetime.datetime(2021, 7, 2, 0, 1, 0, 0)),
+ ('12:02p', datetime.datetime(2021, 7, 2, 12, 2, 0, 0)),
+ ('0:03', datetime.datetime(2021, 7, 2, 0, 3, 0, 0)),
+ ('last wednesday', datetime.datetime(2021, 6, 30)),
+ ('this wed', datetime.datetime(2021, 7, 7)),
+ ('next wed', datetime.datetime(2021, 7, 14)),
+ ('this coming tues', datetime.datetime(2021, 7, 6)),
+ ('this past monday', datetime.datetime(2021, 6, 28)),
+ ('4 days ago', datetime.datetime(2021, 6, 28)),
+ ('4 mondays ago', datetime.datetime(2021, 6, 7)),
+ ('4 months ago', datetime.datetime(2021, 3, 2)),
+ ('3 days back', datetime.datetime(2021, 6, 29)),
+ ('13 weeks from now', datetime.datetime(2021, 10, 1)),
+ ('1 year from now', datetime.datetime(2022, 7, 2)),
+ ('4 weeks from now', datetime.datetime(2021, 7, 30)),
+ ('3 saturdays ago', datetime.datetime(2021, 6, 12)),
+ ('4 months from today', datetime.datetime(2021, 11, 2)),
+ ('4 years from yesterday', datetime.datetime(2025, 7, 1)),
+ ('4 weeks from tomorrow', datetime.datetime(2021, 7, 31)),
+ ('april 15, 2005', datetime.datetime(2005, 4, 15)),
+ ('april 14', datetime.datetime(2021, 4, 14)),
+ ('9:30am on last wednesday', datetime.datetime(2021, 6, 30, 9, 30)),
+ ('2005/apr/15', datetime.datetime(2005, 4, 15)),
+ ('2005 apr 15', datetime.datetime(2005, 4, 15)),
+ ('the 1st wednesday in may', datetime.datetime(2021, 5, 5)),
+ ('last sun of june', datetime.datetime(2021, 6, 27)),
+ ('this Easter', datetime.datetime(2021, 4, 4)),
+ ('last christmas', datetime.datetime(2020, 12, 25)),
+ ('last Xmas', datetime.datetime(2020, 12, 25)),
+ ('xmas, 1999', datetime.datetime(1999, 12, 25)),
+ ('next mlk day', datetime.datetime(2022, 1, 17)),
+ ('Halloween, 2020', datetime.datetime(2020, 10, 31)),
+ ('5 work days after independence day', datetime.datetime(2021, 7, 12)),
+ ('50 working days from last wed', datetime.datetime(2021, 9, 10)),
+ ('25 working days before columbus day', datetime.datetime(2021, 9, 3)),
+ ('today +1 week', datetime.datetime(2021, 7, 9)),
+ ('sunday -3 weeks', datetime.datetime(2021, 6, 13)),
+ ('4 weeks before xmas, 1999', datetime.datetime(1999, 11, 27)),
+ ('3 days before new years eve, 2000', datetime.datetime(2000, 12, 28)),
+ ('july 4th', datetime.datetime(2021, 7, 4)),
+ ('the ides of march', datetime.datetime(2021, 3, 15)),
+ ('the nones of april', datetime.datetime(2021, 4, 5)),
+ ('the kalends of may', datetime.datetime(2021, 5, 1)),
+ ('9/11/2001', datetime.datetime(2001, 9, 11)),
+ ('4 sundays before veterans\' day', datetime.datetime(2021, 10, 17)),
+ ('xmas eve', datetime.datetime(2021, 12, 24)),
+ ('this friday at 5pm', datetime.datetime(2021, 7, 9, 17, 0, 0)),
+ ('presidents day', datetime.datetime(2021, 2, 15)),
+ ('memorial day, 1921', datetime.datetime(1921, 5, 30)),
+ ('today -4 wednesdays', datetime.datetime(2021, 6, 9)),
+ ('thanksgiving', datetime.datetime(2021, 11, 25)),
+ ('2 sun in jun', datetime.datetime(2021, 6, 13)),
+ ('easter -40 days', datetime.datetime(2021, 2, 23)),
+ ('easter +39 days', datetime.datetime(2021, 5, 13)),
+ ('1st tuesday in nov, 2024', datetime.datetime(2024, 11, 5)),
+ (
+ '2 days before last xmas at 3:14:15.92a',
+ datetime.datetime(2020, 12, 23, 3, 14, 15, 92),
+ ),
+ (
+ '3 weeks after xmas, 1995 at midday',
+ datetime.datetime(1996, 1, 15, 12, 0, 0),
+ ),
+ (
+ '4 months before easter, 1992 at midnight',
+ datetime.datetime(1991, 12, 19),
+ ),
+ (
+ '5 months before halloween, 1995 at noon',
+ datetime.datetime(1995, 5, 31, 12),
+ ),
+ ('4 days before last wednesday', datetime.datetime(2021, 6, 26)),
+ ('44 months after today', datetime.datetime(2025, 3, 2)),
+ ('44 years before today', datetime.datetime(1977, 7, 2)),
+ ('44 weeks ago', datetime.datetime(2020, 8, 28)),
+ ('15 minutes to 3am', datetime.datetime(2021, 7, 2, 2, 45)),
+ ('quarter past 4pm', datetime.datetime(2021, 7, 2, 16, 15)),
+ ('half past 9', datetime.datetime(2021, 7, 2, 9, 30)),
+ ('4 seconds to midnight', datetime.datetime(2021, 7, 1, 23, 59, 56)),
+ (
+ '4 seconds to midnight, tomorrow',
+ datetime.datetime(2021, 7, 2, 23, 59, 56),
+ ),
+ ('2021/apr/15T21:30:44.55', datetime.datetime(2021, 4, 15, 21, 30, 44, 55)),
+ (
+ '2021/apr/15 at 21:30:44.55',
+ datetime.datetime(2021, 4, 15, 21, 30, 44, 55),
+ ),
+ (
+ '2021/4/15 at 21:30:44.55',
+ datetime.datetime(2021, 4, 15, 21, 30, 44, 55),
+ ),
+ (
+ '2021/04/15 at 21:30:44.55',
+ datetime.datetime(2021, 4, 15, 21, 30, 44, 55),
+ ),
+ (
+ '2021/04/15 at 21:30:44.55Z',
+ datetime.datetime(2021, 4, 15, 21, 30, 44, 55, tzinfo=pytz.timezone('UTC')),
+ ),
+ (
+ '2021/04/15 at 21:30:44.55EST',
+ datetime.datetime(2021, 4, 15, 21, 30, 44, 55, tzinfo=pytz.timezone('EST')),
+ ),
+ (
+ '13 days after last memorial day at 12 seconds before 4pm',
+ datetime.datetime(2020, 6, 7, 15, 59, 48),
+ ),
+ (
+ ' 2 days before yesterday at 9am ',
+ datetime.datetime(2021, 6, 29, 9),
+ ),
+ ('-3 days before today', datetime.datetime(2021, 7, 5)),
+ (
+ '3 days before yesterday at midnight EST',
+ datetime.datetime(2021, 6, 28, tzinfo=pytz.timezone('EST')),
+ ),
]
class TestDateparseUtils(unittest.TestCase):
-
@uu.check_method_for_perf_regressions
def test_dateparsing(self):
- dp = du.DateParser(
- override_now_for_test_purposes = datetime.datetime(2021, 7, 2)
- )
+ dp = du.DateParser(override_now_for_test_purposes=datetime.datetime(2021, 7, 2))
for (txt, expected_dt) in parsable_expressions:
try:
self.assertEqual(
actual_dt,
expected_dt,
- f'"{txt}", got "{actual_dt}" while expecting "{expected_dt}"'
+ f'"{txt}", got "{actual_dt}" while expecting "{expected_dt}"',
)
except du.ParseException:
self.fail(f'Expected "{txt}" to parse successfully.')
def test_whitespace_handling(self):
- dp = du.DateParser(
- override_now_for_test_purposes = datetime.datetime(2021, 7, 2)
- )
+ dp = du.DateParser(override_now_for_test_purposes=datetime.datetime(2021, 7, 2))
for (txt, expected_dt) in parsable_expressions:
try:
txt = f' {txt} '
i = random.randint(2, 5)
replacement = ' ' * i
- txt = re.sub('\s', replacement, txt)
+ txt = re.sub(r'\s', replacement, txt)
actual_dt = dp.parse(txt)
self.assertIsNotNone(actual_dt)
self.assertEqual(
actual_dt,
expected_dt,
- f'"{txt}", got "{actual_dt}" while expecting "{expected_dt}"'
+ f'"{txt}", got "{actual_dt}" while expecting "{expected_dt}"',
)
except du.ParseException:
self.fail(f'Expected "{txt}" to parse successfully.')
import unittest
import decorator_utils as du
-
import unittest_utils as uu
class TestDecorators(unittest.TestCase):
-
def test_singleton(self):
-
@du.singleton
- class FooBar():
+ class FooBar:
pass
x = FooBar()
#!/usr/bin/env python3
-import random
import math
+import random
import unittest
import bootstrap
class TestLetterCompress(unittest.TestCase):
-
def test_with_random_strings(self):
alphabet = 'abcdefghijklmnopqrstuvwxyz .,"-'
for n in range(20):
class TestLoggingUtils(unittest.TestCase):
-
def test_output_context(self):
unique_suffix = sutils.generate_uuid(True)
filename = f'/tmp/logging_utils_test.{unique_suffix}'
with tempfile.SpooledTemporaryFile(mode='r+') as tmpfile1:
with uu.RecordStdout() as record:
with lutils.OutputMultiplexerContext(
- lutils.OutputMultiplexer.Destination.FILENAMES |
- lutils.OutputMultiplexer.Destination.FILEHANDLES |
- lutils.OutputMultiplexer.Destination.LOG_INFO,
- filenames = [filename, '/dev/null'],
- handles = [tmpfile1, sys.stdout],
+ lutils.OutputMultiplexer.Destination.FILENAMES
+ | lutils.OutputMultiplexer.Destination.FILEHANDLES
+ | lutils.OutputMultiplexer.Destination.LOG_INFO,
+ filenames=[filename, '/dev/null'],
+ handles=[tmpfile1, sys.stdout],
) as mplex:
mplex.print(secret_message, end='')
with uu.RecordMultipleStreams(sys.stderr, sys.stdout) as record:
print("This is a test!")
print("This is one too.", file=sys.stderr)
- self.assertEqual(record().readlines(),
- ["This is a test!\n", "This is one too.\n"])
+ self.assertEqual(record().readlines(), ["This is a test!\n", "This is one too.\n"])
if __name__ == '__main__':
import unittest
-from type.money import Money
import unittest_utils as uu
+from type.money import Money
class TestMoney(unittest.TestCase):
-
def test_basic_utility(self):
amount = Money(1.45)
another = Money.parse("USD 1.45")
class TestProfanityFilter(unittest.TestCase):
-
def test_basic_functionality(self):
p = pf.ProfanityFilter()
self.assertTrue(p.is_bad_word('shit'))
self.assertTrue(p.contains_bad_word('this is another fucking test'))
self.assertTrue(p.contains_bad_word('this is another fuckin test'))
- self.assertFalse(p.contains_bad_word('Mary had a little lamb whose fleese was white as snow.'))
+ self.assertFalse(
+ p.contains_bad_word('Mary had a little lamb whose fleese was white as snow.')
+ )
if __name__ == '__main__':
import unittest
-from type.rate import Rate
-from type.money import Money
-
import unittest_utils as uu
+from type.money import Money
+from type.rate import Rate
class TestRate(unittest.TestCase):
def test_basic_utility(self):
my_stock_returns = Rate(percent_change=-20.0)
my_portfolio = 1000.0
- self.assertAlmostEqual(
- 800.0,
- my_stock_returns.apply_to(my_portfolio)
- )
+ self.assertAlmostEqual(800.0, my_stock_returns.apply_to(my_portfolio))
my_bond_returns = Rate(percentage=104.5)
my_money = Money(500.0)
- self.assertAlmostEqual(
- Money(522.5),
- my_bond_returns.apply_to(my_money)
- )
+ self.assertAlmostEqual(Money(522.5), my_bond_returns.apply_to(my_money))
my_multiplier = Rate(multiplier=1.72)
my_nose_length = 3.2
- self.assertAlmostEqual(
- 5.504,
- my_multiplier.apply_to(my_nose_length)
- )
+ self.assertAlmostEqual(5.504, my_multiplier.apply_to(my_nose_length))
def test_conversions(self):
x = Rate(104.55)
import unittest
-from ansi import fg, bg, reset
import bootstrap
import string_utils as su
-
import unittest_utils as uu
+from ansi import bg, fg, reset
@uu.check_all_methods_for_perf_regressions()
class TestStringUtils(unittest.TestCase):
-
def test_is_none_or_empty(self):
self.assertTrue(su.is_none_or_empty(None))
self.assertTrue(su.is_none_or_empty(""))
s = f' {fg("red")} this is a test {bg("white")} this is a test {reset()} '
self.assertEqual(
su.strip_escape_sequences(s),
- ' this is a test this is a test '
+ ' this is a test this is a test ',
)
s = ' this is another test '
self.assertEqual(su.strip_escape_sequences(s), s)
subwidth = math.floor(width / len(strings))
retval = ""
for string in strings:
- string = justify_string(
- string, width=subwidth, alignment=alignment, padding=padding
- )
+ string = justify_string(string, width=subwidth, alignment=alignment, padding=padding)
retval += string
while len(retval) > width:
retval = retval.replace(' ', ' ', 1)
padding = padding[0]
first, *rest, last = string.split()
w = width - (len(first) + 1 + len(last) + 1)
- ret = (
- first
- + padding
- + distribute_strings(rest, width=w, padding=padding)
- + padding
- + last
- )
+ ret = first + padding + distribute_strings(rest, width=w, padding=padding) + padding + last
return ret
if isinstance(other, CentCount):
if self.currency == other.currency:
return CentCount(
- centcount=self.centcount + other.centcount, currency=self.currency
+ centcount=self.centcount + other.centcount,
+ currency=self.currency,
)
else:
raise TypeError('Incompatible currencies in add expression')
if isinstance(other, CentCount):
if self.currency == other.currency:
return CentCount(
- centcount=self.centcount - other.centcount, currency=self.currency
+ centcount=self.centcount - other.centcount,
+ currency=self.currency,
)
else:
raise TypeError('Incompatible currencies in add expression')
raise TypeError('can not multiply monetary quantities')
else:
return CentCount(
- centcount=int(self.centcount * float(other)), currency=self.currency
+ centcount=int(self.centcount * float(other)),
+ currency=self.currency,
)
def __truediv__(self, other):
if isinstance(other, CentCount):
if self.currency == other.currency:
return CentCount(
- centcount=other.centcount - self.centcount, currency=self.currency
+ centcount=other.centcount - self.centcount,
+ currency=self.currency,
)
else:
raise TypeError('Incompatible currencies in sub expression')
raise TypeError('In strict_mode only two moneys can be added')
else:
return CentCount(
- centcount=int(other) - self.centcount, currency=self.currency
+ centcount=int(other) - self.centcount,
+ currency=self.currency,
)
__rmul__ = __mul__
import enum
+
@enum.unique
class Location(enum.Enum):
UNKNOWN = 0
raise TypeError('In strict_mode only two moneys can be added')
else:
return Money(
- amount=self.amount + Decimal(float(other)), currency=self.currency
+ amount=self.amount + Decimal(float(other)),
+ currency=self.currency,
)
def __sub__(self, other):
raise TypeError('In strict_mode only two moneys can be added')
else:
return Money(
- amount=self.amount - Decimal(float(other)), currency=self.currency
+ amount=self.amount - Decimal(float(other)),
+ currency=self.currency,
)
def __mul__(self, other):
raise TypeError('can not multiply monetary quantities')
else:
return Money(
- amount=self.amount * Decimal(float(other)), currency=self.currency
+ amount=self.amount * Decimal(float(other)),
+ currency=self.currency,
)
def __truediv__(self, other):
raise TypeError('can not divide monetary quantities')
else:
return Money(
- amount=self.amount / Decimal(float(other)), currency=self.currency
+ amount=self.amount / Decimal(float(other)),
+ currency=self.currency,
)
def __float__(self):
raise TypeError('In strict_mode only two moneys can be added')
else:
return Money(
- amount=Decimal(float(other)) - self.amount, currency=self.currency
+ amount=Decimal(float(other)) - self.amount,
+ currency=self.currency,
)
__rmul__ = __mul__
AARON_AND_DANA = 4
AARON = 4
DANA = 4
-
import scott_secrets
logger = logging.getLogger(__name__)
-cfg = config.add_commandline_args(
- f'Logging ({__file__})', 'Args related to function decorators'
-)
+cfg = config.add_commandline_args(f'Logging ({__file__})', 'Args related to function decorators')
cfg.add_argument(
'--unittests_ignore_perf',
action='store_true',
def load_performance_data(self, method_id: str) -> Dict[str, List[float]]:
results = self.conn.execute(
- sa.text(
- f'SELECT * FROM runtimes_by_function WHERE function = "{method_id}";'
- )
+ sa.text(f'SELECT * FROM runtimes_by_function WHERE function = "{method_id}";')
)
ret: Dict[str, List[float]] = {method_id: []}
for result in results.all():
helper = FileBasedPerfRegressionDataPersister(filename)
elif config.config['unittests_persistance_strategy'] == 'DATABASE':
dbspec = config.config['unittests_perfdb_spec']
- dbspec = dbspec.replace(
- '<PASSWORD>', scott_secrets.MARIADB_UNITTEST_PERF_PASSWORD
- )
+ dbspec = dbspec.replace('<PASSWORD>', scott_secrets.MARIADB_UNITTEST_PERF_PASSWORD)
helper = DatabasePerfRegressionDataPersister(dbspec)
else:
raise Exception('Unknown/unexpected --unittests_persistance_strategy value')
import decorator_utils
import list_utils
-cfg = config.add_commandline_args(
- f'Unscramble! ({__file__})', 'A fast word unscrambler.'
-)
+cfg = config.add_commandline_args(f'Unscramble! ({__file__})', 'A fast word unscrambler.')
cfg.add_argument(
"--unscramble_indexfile",
help="Path to a file of signature -> word index.",
self.everyone_gone_since: Optional[datetime.datetime] = None
self.someone_home_since: Optional[datetime.datetime] = None
self.location = override_location
- self.detector: base_presence.PresenceDetection = (
- base_presence.PresenceDetection()
- )
+ self.detector: base_presence.PresenceDetection = base_presence.PresenceDetection()
super().__init__(
{
'poll_presence': override_update_interval_sec,