caused by assert foo when foo was an int with valid valid 0.
15 files changed:
return x
path = self.parent_path(node)
return x
path = self.parent_path(node)
+ assert path[-1] is not None
assert path[-1] == node
path = path[:-1]
path.reverse()
for ancestor in path:
assert path[-1] == node
path = path[:-1]
path.reverse()
for ancestor in path:
+ assert ancestor is not None
if node != ancestor.right:
return ancestor
node = ancestor
if node != ancestor.right:
return ancestor
node = ancestor
super().__init__()
self.name = name
self._serializer = PickleSerializer()
super().__init__()
self.name = name
self._serializer = PickleSerializer()
+ assert size_bytes is None or size_bytes > 0
self.shared_memory = self._get_or_create_memory_block(name, size_bytes)
self._ensure_memory_initialization()
self.lock = RLock()
self.shared_memory = self._get_or_create_memory_block(name, size_bytes)
self._ensure_memory_initialization()
self.lock = RLock()
try:
return shared_memory.SharedMemory(name=name)
except FileNotFoundError:
try:
return shared_memory.SharedMemory(name=name)
except FileNotFoundError:
+ assert size_bytes is not None
return shared_memory.SharedMemory(name=name, create=True, size=size_bytes)
def _ensure_memory_initialization(self):
return shared_memory.SharedMemory(name=name, create=True, size=size_bytes)
def _ensure_memory_initialization(self):
# Apply resudual adjustments to times here when we have a
# datetime.
self.datetime = self.datetime + self.timedelta
# Apply resudual adjustments to times here when we have a
# datetime.
self.datetime = self.datetime + self.timedelta
+ assert self.datetime is not None
self.time = datetime.time(
self.datetime.hour,
self.datetime.minute,
self.time = datetime.time(
self.datetime.hour,
self.datetime.minute,
logger.exception(e)
print("Unrecognized.")
else:
logger.exception(e)
print("Unrecognized.")
else:
print(dt.strftime('%A %Y/%m/%d %H:%M:%S.%f %Z(%z)'))
sys.exit(0)
print(dt.strftime('%A %Y/%m/%d %H:%M:%S.%f %Z(%z)'))
sys.exit(0)
assert file_utils.does_file_exist(filename)
if mtime is None:
mtime = file_utils.get_file_raw_mtime(filename)
assert file_utils.does_file_exist(filename)
if mtime is None:
mtime = file_utils.get_file_raw_mtime(filename)
+ assert mtime is not None
if self.mtime_by_filename.get(filename, 0) != mtime:
md5 = file_utils.get_file_md5(filename)
logger.debug(f'Computed/stored {filename}\'s MD5 at ts={mtime} ({md5})')
if self.mtime_by_filename.get(filename, 0) != mtime:
md5 = file_utils.get_file_md5(filename)
logger.debug(f'Computed/stored {filename}\'s MD5 at ts={mtime} ({md5})')
assert file_utils.does_file_exist(filename)
if mtime is None:
mtime = file_utils.get_file_raw_mtime(filename)
assert file_utils.does_file_exist(filename)
if mtime is None:
mtime = file_utils.get_file_raw_mtime(filename)
+ assert mtime is not None
if self.mtime_by_filename.get(filename, 0) != mtime:
md5 = file_utils.get_file_md5(filename)
self.mtime_by_filename[filename] = mtime
if self.mtime_by_filename.get(filename, 0) != mtime:
md5 = file_utils.get_file_md5(filename)
self.mtime_by_filename[filename] = mtime
self.all_md5s.add(md5)
def apply(self, item: Any, ignored_filename: str = None) -> bool:
self.all_md5s.add(md5)
def apply(self, item: Any, ignored_filename: str = None) -> bool:
- assert not ignored_filename
+ assert ignored_filename is None
self._update()
mem_hash = hashlib.md5()
mem_hash.update(item)
self._update()
mem_hash = hashlib.md5()
mem_hash.update(item)
self.in_flight_bundles_by_worker[worker].remove(uuid)
if not was_cancelled:
start = self.start_per_bundle[uuid]
self.in_flight_bundles_by_worker[worker].remove(uuid)
if not was_cancelled:
start = self.start_per_bundle[uuid]
+ assert start is not None
bundle_latency = ts - start
x = self.finished_bundle_timings_per_worker.get(worker, list())
x.append(bundle_latency)
bundle_latency = ts - start
x = self.finished_bundle_timings_per_worker.get(worker, list())
x.append(bundle_latency)
worker = None
while worker is None:
worker = self.find_available_worker_or_block(avoid_machine)
worker = None
while worker is None:
worker = self.find_available_worker_or_block(avoid_machine)
+ assert worker is not None
# Ok, found a worker.
bundle.worker = worker
# Ok, found a worker.
bundle.worker = worker
self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
) -> Any:
machine = bundle.machine
self, p: Optional[subprocess.Popen], bundle: BundleDetails, depth: int
) -> Any:
machine = bundle.machine
pid = p.pid
if depth > 3:
logger.error(
pid = p.pid
if depth > 3:
logger.error(
# Tell the original to stop if we finished first.
if not was_cancelled:
orig_bundle = bundle.src_bundle
# Tell the original to stop if we finished first.
if not was_cancelled:
orig_bundle = bundle.src_bundle
+ assert orig_bundle is not None
logger.debug(
f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.'
)
logger.debug(
f'{bundle}: Notifying original {orig_bundle.uuid} we beat them to it.'
)
def set_file_raw_atime(filename: str, atime: float):
mtime = get_file_raw_mtime(filename)
def set_file_raw_atime(filename: str, atime: float):
mtime = get_file_raw_mtime(filename)
+ assert mtime is not None
os.utime(filename, (atime, mtime))
def set_file_raw_mtime(filename: str, mtime: float):
atime = get_file_raw_atime(filename)
os.utime(filename, (atime, mtime))
def set_file_raw_mtime(filename: str, mtime: float):
atime = get_file_raw_atime(filename)
+ assert atime is not None
os.utime(filename, (atime, mtime))
os.utime(filename, (atime, mtime))
last_bucket_start = bucket[0] # beginning of range
if max_population is None or pop > max_population:
max_population = pop # bucket with max items
last_bucket_start = bucket[0] # beginning of range
if max_population is None or pop > max_population:
max_population = pop # bucket with max items
- if max_population is None:
+ if len(self.buckets) == 0 or max_population is None:
return txt
max_label_width: Optional[int] = None
return txt
max_label_width: Optional[int] = None
max_label_width = label_width
if start == last_bucket_start:
break
max_label_width = label_width
if start == last_bucket_start:
break
- assert max_label_width
- assert lowest_start
- assert highest_end
+ assert max_label_width is not None
+ assert lowest_start is not None
+ assert highest_end is not None
sigma_label = f'[{label_formatter}..{label_formatter}): ' % (
lowest_start,
sigma_label = f'[{label_formatter}..{label_formatter}): ' % (
lowest_start,
if config.config['logging_syslog_facility']:
facility_name = 'LOG_' + config.config['logging_syslog_facility']
facility = SysLogHandler.__dict__.get(facility_name, SysLogHandler.LOG_USER) # type: ignore
if config.config['logging_syslog_facility']:
facility_name = 'LOG_' + config.config['logging_syslog_facility']
facility = SysLogHandler.__dict__.get(facility_name, SysLogHandler.LOG_USER) # type: ignore
+ assert facility is not None
handler = SysLogHandler(facility=facility, address='/dev/log')
handler.setFormatter(
MillisecondAwareFormatter(
handler = SysLogHandler(facility=facility, address='/dev/log')
handler.setFormatter(
MillisecondAwareFormatter(
print(msg)
logger.info(msg)
print(msg)
logger.info(msg)
- assert best_training_score
- assert best_test_score
- assert best_params
+ assert best_training_score is not None
+ assert best_test_score is not None
+ assert best_params is not None
(
scaler_filename,
model_filename,
(
scaler_filename,
model_filename,
return False
mtime = file_utils.get_file_mtime_as_datetime(filename)
return False
mtime = file_utils.get_file_mtime_as_datetime(filename)
+ assert mtime is not None
now = datetime.datetime.now()
return mtime.month == now.month and mtime.day == now.day and mtime.year == now.year
now = datetime.datetime.now()
return mtime.month == now.month and mtime.day == now.day and mtime.year == now.year
return False
mtime = file_utils.get_file_mtime_as_datetime(filename)
return False
mtime = file_utils.get_file_mtime_as_datetime(filename)
+ assert mtime is not None
now = datetime.datetime.now()
return (now - mtime).total_seconds() <= limit_seconds
now = datetime.datetime.now()
return (now - mtime).total_seconds() <= limit_seconds
name = self.camera_name
assert name is not None
if name == 'driveway':
name = self.camera_name
assert name is not None
if name == 'driveway':
- return f'http://10.0.0.226:8080/Umtxxf1uKMBniFblqeQ9KRbb6DDzN4/mjpeg/GKlT2FfiSQ/driveway'
+ return 'http://10.0.0.226:8080/Umtxxf1uKMBniFblqeQ9KRbb6DDzN4/mjpeg/GKlT2FfiSQ/driveway'
else:
return f'http://10.0.0.226:8080/Umtxxf1uKMBniFblqeQ9KRbb6DDzN4/mp4/GKlT2FfiSQ/{name}/s.mp4'
else:
return f'http://10.0.0.226:8080/Umtxxf1uKMBniFblqeQ9KRbb6DDzN4/mp4/GKlT2FfiSQ/{name}/s.mp4'
BaseChromecast.ccasts,
BaseChromecast.browser,
) = pychromecast.get_chromecasts(timeout=15.0)
BaseChromecast.ccasts,
BaseChromecast.browser,
) = pychromecast.get_chromecasts(timeout=15.0)
- assert BaseChromecast.browser
+ assert BaseChromecast.browser is not None
atexit.register(BaseChromecast.browser.stop_discovery)
BaseChromecast.refresh_ts = now
atexit.register(BaseChromecast.browser.stop_discovery)
BaseChromecast.refresh_ts = now
@overrides
def turn_on(self) -> bool:
self.lazy_initialize_device()
@overrides
def turn_on(self) -> bool:
self.lazy_initialize_device()
- assert self.meross_wrapper
- assert self.device
+ assert self.meross_wrapper is not None
+ assert self.device is not None
self.meross_wrapper.loop.run_until_complete(self.device.async_turn_on())
return True
@overrides
def turn_off(self) -> bool:
self.lazy_initialize_device()
self.meross_wrapper.loop.run_until_complete(self.device.async_turn_on())
return True
@overrides
def turn_off(self) -> bool:
self.lazy_initialize_device()
- assert self.meross_wrapper
- assert self.device
+ assert self.meross_wrapper is not None
+ assert self.device is not None
self.meross_wrapper.loop.run_until_complete(self.device.async_turn_off())
return True
@overrides
def is_on(self) -> bool:
self.lazy_initialize_device()
self.meross_wrapper.loop.run_until_complete(self.device.async_turn_off())
return True
@overrides
def is_on(self) -> bool:
self.lazy_initialize_device()
+ assert self.device is not None
return self.device.is_on()
@overrides
return self.device.is_on()
@overrides
def __enter__(self) -> Callable[[], tempfile.SpooledTemporaryFile]:
self.recorder = contextlib.redirect_stdout(self.destination)
def __enter__(self) -> Callable[[], tempfile.SpooledTemporaryFile]:
self.recorder = contextlib.redirect_stdout(self.destination)
+ assert self.recorder is not None
self.recorder.__enter__()
return lambda: self.destination
def __exit__(self, *args) -> Optional[bool]:
self.recorder.__enter__()
return lambda: self.destination
def __exit__(self, *args) -> Optional[bool]:
+ assert self.recorder is not None
self.recorder.__exit__(*args)
self.destination.seek(0)
return None
self.recorder.__exit__(*args)
self.destination.seek(0)
return None
def __enter__(self) -> Callable[[], tempfile.SpooledTemporaryFile]:
self.recorder = contextlib.redirect_stderr(self.destination) # type: ignore
def __enter__(self) -> Callable[[], tempfile.SpooledTemporaryFile]:
self.recorder = contextlib.redirect_stderr(self.destination) # type: ignore
+ assert self.recorder is not None
self.recorder.__enter__()
return lambda: self.destination
def __exit__(self, *args) -> Optional[bool]:
self.recorder.__enter__()
return lambda: self.destination
def __exit__(self, *args) -> Optional[bool]:
+ assert self.recorder is not None
self.recorder.__exit__(*args)
self.destination.seek(0)
return None
self.recorder.__exit__(*args)
self.destination.seek(0)
return None
if self.someone_is_home is None:
raise Exception("Too Soon!")
if self.someone_is_home:
if self.someone_is_home is None:
raise Exception("Too Soon!")
if self.someone_is_home:
- assert self.someone_home_since
+ assert self.someone_home_since is not None
return (True, self.someone_home_since)
else:
return (True, self.someone_home_since)
else:
- assert self.everyone_gone_since
+ assert self.everyone_gone_since is not None
return (False, self.everyone_gone_since)
return (False, self.everyone_gone_since)