From dab5654d392f69fb00bed49cf8ffb80f37642ea5 Mon Sep 17 00:00:00 2001 From: Scott Gasch Date: Tue, 13 Apr 2021 13:55:12 -0700 Subject: [PATCH] Various sundry changes. --- ansi.py | 4 +- file_utils.py | 58 +++++++++++++++++++++++- input_utils.py | 3 ++ ml_model_trainer.py | 3 +- ml_quick_label.py | 105 ++++++++++++++++++++++++++++---------------- string_utils.py | 28 ++++++++++++ 6 files changed, 159 insertions(+), 42 deletions(-) diff --git a/ansi.py b/ansi.py index 476f0a4..dc9a315 100755 --- a/ansi.py +++ b/ansi.py @@ -1620,7 +1620,7 @@ def italic() -> str: def italics() -> str: - return "" + return italic() def underline() -> str: @@ -1632,7 +1632,7 @@ def strikethrough() -> str: def strike_through() -> str: - return "" + return strikethrough() def is_16color(num: int) -> bool: diff --git a/file_utils.py b/file_utils.py index eb8c2c0..7cc8b63 100644 --- a/file_utils.py +++ b/file_utils.py @@ -4,6 +4,7 @@ import datetime import errno +import hashlib import logging import os import time @@ -48,7 +49,35 @@ def create_path_if_not_exist(path, on_error=None): def does_file_exist(filename: str) -> bool: - return os.path.exists(filename) + return os.path.exists(filename) and os.path.isfile(filename) + + +def does_directory_exist(dirname: str) -> bool: + return os.path.exists(dirname) and os.path.isdir(dirname) + + +def does_path_exist(pathname: str) -> bool: + return os.path.exists(pathname) + + +def get_file_size(filename: str) -> int: + return os.path.getsize(filename) + + +def is_normal_file(filename: str) -> bool: + return os.path.isfile(filename) + + +def is_directory(filename: str) -> bool: + return os.path.isdir(filename) + + +def is_symlink(filename: str) -> bool: + return os.path.islink(filename) + + +def is_same_file(file1: str, file2: str) -> bool: + return os.path.samefile(file1, file2) def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]: @@ -78,6 +107,33 @@ def get_file_raw_ctime(filename: str) -> Optional[float]: return get_file_raw_timestamp(filename, lambda x: x.st_ctime) +def get_file_md5(filename: str) -> str: + file_hash = hashlib.md5() + with open(filename, "rb") as f: + chunk = f.read(8192) + while chunk: + file_hash.update(chunk) + chunk = f.read(8192) + return file_hash.hexdigest() + + +def set_file_raw_atime(filename: str, atime: float): + mtime = get_file_raw_mtime(filename) + os.utime(filename, (atime, mtime)) + + +def set_file_raw_mtime(filename: str, mtime: float): + atime = get_file_raw_atime(filename) + os.utime(filename, (atime, mtime)) + + +def set_file_raw_atime_and_mtime(filename: str, ts: float = None): + if ts is not None: + os.utime(filename, (ts, ts)) + else: + os.utime(filename, None) + + def convert_file_timestamp_to_datetime( filename: str, producer ) -> Optional[datetime.datetime]: diff --git a/input_utils.py b/input_utils.py index 913146a..b19bfe1 100644 --- a/input_utils.py +++ b/input_utils.py @@ -24,6 +24,7 @@ def single_keystroke_response( def _single_keystroke_response_internal( valid_responses: List[str], timeout_seconds=None ) -> str: + os_special_keystrokes = [3, 26] # ^C, ^Z if timeout_seconds is not None: signal.signal(signal.SIGALRM, _handle_timeout) signal.alarm(timeout_seconds) @@ -33,6 +34,8 @@ def single_keystroke_response( response = readchar.readchar() if response in valid_responses: break + if ord(response) in os_special_keystrokes: + break return response finally: if timeout_seconds is not None: diff --git a/ml_model_trainer.py b/ml_model_trainer.py index 22735c9..7804d86 100644 --- a/ml_model_trainer.py +++ b/ml_model_trainer.py @@ -222,8 +222,7 @@ class TrainingBlueprint(ABC): try: (key, value) = line.split(self.spec.key_value_delimiter) except Exception as e: - logger.exception(e) - print(f"WARNING: bad line '{line}', skipped") + logger.debug(f"WARNING: bad line in file {filename} '{line}', skipped") continue key = key.strip() diff --git a/ml_quick_label.py b/ml_quick_label.py index 1c35982..5a112db 100644 --- a/ml_quick_label.py +++ b/ml_quick_label.py @@ -1,13 +1,15 @@ #!/usr/bin/env python3 import glob +import logging import os -from typing import Callable, List, NamedTuple, Set +from typing import Callable, List, NamedTuple, Optional, Set import argparse_utils import config import input_utils +logger = logging.getLogger(__name__) parser = config.add_commandline_args( f"ML Quick Labeler ({__file__})", "Args related to quick labeling of ML training data", @@ -17,12 +19,25 @@ parser.add_argument( default="./qlabel_skip_list.txt", metavar="FILENAME", type=argparse_utils.valid_filename, - help="Path to file in which to store already labeled data", + help="Path to file in which to store already labeled data.", +) +parser.add_argument( + "--ml_quick_label_use_skip_lists", + default=True, + action=argparse_utils.ActionNoYes, + help='Should we use a skip list file to speed up execution?', +) +parser.add_argument( + "--ml_quick_label_overwrite_labels", + default=False, + action=argparse_utils.ActionNoYes, + help='Enable overwriting existing labels; default is to not relabel.', ) class InputSpec(NamedTuple): - image_file_glob: str + image_file_glob: Optional[str] + image_file_prepopulated_list: Optional[List[str]] image_file_to_features_file: Callable[[str], str] label: str valid_keystrokes: List[str] @@ -32,60 +47,76 @@ class InputSpec(NamedTuple): def read_skip_list() -> Set[str]: ret: Set[str] = set() - quick_skip_file = config.config['ml_quick_label_skip_list_path'] - if not os.path.exists(quick_skip_file): - return ret - with open(quick_skip_file, 'r') as f: - lines = f.readlines() - for line in lines: - line = line[:-1] - line.strip() - ret.add(line) + if config.config['ml_quick_label_use_skip_lists']: + quick_skip_file = config.config['ml_quick_label_skip_list_path'] + if os.path.exists(quick_skip_file): + with open(quick_skip_file, 'r') as f: + lines = f.readlines() + for line in lines: + line = line[:-1] + line.strip() + ret.add(line) + logger.debug(f'Read {quick_skip_file} and found {len(ret)} entries.') return ret def write_skip_list(skip_list) -> None: - quick_skip_file = config.config['ml_quick_label_skip_list_path'] - with open(quick_skip_file, 'w') as f: - for filename in skip_list: - filename = filename.strip() - if len(filename) > 0: - f.write(f'{filename}\n') + if config.config['ml_quick_label_use_skip_lists']: + quick_skip_file = config.config['ml_quick_label_skip_list_path'] + with open(quick_skip_file, 'w') as f: + for filename in skip_list: + filename = filename.strip() + if len(filename) > 0: + f.write(f'{filename}\n') + logger.debug(f'Updated {quick_skip_file}') def label(in_spec: InputSpec) -> None: - images = glob.glob(in_spec.image_file_glob) + images = [] + if in_spec.image_file_glob is not None: + images += glob.glob(in_spec.image_file_glob) + elif in_spec.image_file_prepopulated_list is not None: + images += in_spec.image_file_prepopulated_list + else: + raise ValueError( + 'One of image_file_glob or image_file_prepopulated_list is required' + ) skip_list = read_skip_list() for image in images: if image in skip_list: + logger.debug(f'Skipping {image} because of the skip list') continue features = in_spec.image_file_to_features_file(image) if features is None or not os.path.exists(features): + logger.warning( + f'File {image} yielded file {features} which does not exist, SKIPPING.' + ) continue # Render features and image. + filtered_lines = [] with open(features, "r") as f: lines = f.readlines() - skip = False + saw_label = False for line in lines: line = line[:-1] - if in_spec.label in line: - skip = True - if skip: + if in_spec.label not in line: + filtered_lines.append(line) + else: + saw_label = True + + if not saw_label or config.config['ml_quick_label_overwrite_labels']: + logger.info(features) + os.system(f'xv {image} &') + keystroke = input_utils.single_keystroke_response( + in_spec.valid_keystrokes, + prompt=in_spec.prompt, + ) + os.system('killall xv') + label_value = in_spec.keystroke_to_label(keystroke) + filtered_lines.append(f"{in_spec.label}: {label_value}\n") + with open(features, 'w') as f: + f.writelines("%s\n" % line for line in filtered_lines) skip_list.add(image) - continue - - os.system(f'xv {image} &') - keystroke = input_utils.single_keystroke_response( - in_spec.valid_keystrokes, - prompt=in_spec.prompt, - ) - os.system('killall xv') - - label_value = in_spec.keystroke_to_label(keystroke) - with open(features, "a") as f: - f.write(f"{in_spec.label}: {label_value}\n") - skip_list.add(image) - write_skip_list(skip_list) diff --git a/string_utils.py b/string_utils.py index b586ae1..83575ff 100644 --- a/string_utils.py +++ b/string_utils.py @@ -1,5 +1,6 @@ #!/usr/bin/env python3 +from itertools import zip_longest import json import random import re @@ -220,6 +221,33 @@ def strip_escape_sequences(in_str: str) -> str: return in_str +def add_thousands_separator(in_str: str, *, separator_char = ',', places = 3) -> str: + if isinstance(in_str, int): + in_str = f'{in_str}' + + if is_number(in_str): + return _add_thousands_separator( + in_str, + separator_char = separator_char, + places = places + ) + raise ValueError(in_str) + + +def _add_thousands_separator(in_str: str, *, separator_char = ',', places = 3) -> str: + decimal_part = "" + if '.' in in_str: + (in_str, decimal_part) = in_str.split('.') + tmp = [iter(in_str[::-1])] * places + ret = separator_char.join( + "".join(x) for x in zip_longest(*tmp, fillvalue=""))[::-1] + if len(decimal_part) > 0: + ret += '.' + ret += decimal_part + return ret + + + # Full url example: # scheme://username:password@www.domain.com:8042/folder/subfolder/file.extension?param=value¶m2=value2#hash def is_url(in_str: Any, allowed_schemes: Optional[List[str]] = None) -> bool: -- 2.46.0