Various sundry changes.
authorScott Gasch <[email protected]>
Tue, 13 Apr 2021 20:55:12 +0000 (13:55 -0700)
committerScott Gasch <[email protected]>
Tue, 13 Apr 2021 20:55:12 +0000 (13:55 -0700)
ansi.py
file_utils.py
input_utils.py
ml_model_trainer.py
ml_quick_label.py
string_utils.py

diff --git a/ansi.py b/ansi.py
index 476f0a4dd9f80c111567a6b8c5395ac17f2686a2..dc9a31542f4551a73d9ab1de341b7fce11e85f47 100755 (executable)
--- a/ansi.py
+++ b/ansi.py
@@ -1620,7 +1620,7 @@ def italic() -> str:
 
 
 def italics() -> str:
-    return "\e[3m"
+    return italic()
 
 
 def underline() -> str:
@@ -1632,7 +1632,7 @@ def strikethrough() -> str:
 
 
 def strike_through() -> str:
-    return "\e[9m"
+    return strikethrough()
 
 
 def is_16color(num: int) -> bool:
index eb8c2c0dbe336a165068b724c4162dc3667a89bc..7cc8b632ac692d47a6f272403100c6806dc19136 100644 (file)
@@ -4,6 +4,7 @@
 
 import datetime
 import errno
+import hashlib
 import logging
 import os
 import time
@@ -48,7 +49,35 @@ def create_path_if_not_exist(path, on_error=None):
 
 
 def does_file_exist(filename: str) -> bool:
-    return os.path.exists(filename)
+    return os.path.exists(filename) and os.path.isfile(filename)
+
+
+def does_directory_exist(dirname: str) -> bool:
+    return os.path.exists(dirname) and os.path.isdir(dirname)
+
+
+def does_path_exist(pathname: str) -> bool:
+    return os.path.exists(pathname)
+
+
+def get_file_size(filename: str) -> int:
+    return os.path.getsize(filename)
+
+
+def is_normal_file(filename: str) -> bool:
+    return os.path.isfile(filename)
+
+
+def is_directory(filename: str) -> bool:
+    return os.path.isdir(filename)
+
+
+def is_symlink(filename: str) -> bool:
+    return os.path.islink(filename)
+
+
+def is_same_file(file1: str, file2: str) -> bool:
+    return os.path.samefile(file1, file2)
 
 
 def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
@@ -78,6 +107,33 @@ def get_file_raw_ctime(filename: str) -> Optional[float]:
     return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
 
 
+def get_file_md5(filename: str) -> str:
+    file_hash = hashlib.md5()
+    with open(filename, "rb") as f:
+        chunk = f.read(8192)
+        while chunk:
+            file_hash.update(chunk)
+            chunk = f.read(8192)
+    return file_hash.hexdigest()
+
+
+def set_file_raw_atime(filename: str, atime: float):
+    mtime = get_file_raw_mtime(filename)
+    os.utime(filename, (atime, mtime))
+
+
+def set_file_raw_mtime(filename: str, mtime: float):
+    atime = get_file_raw_atime(filename)
+    os.utime(filename, (atime, mtime))
+
+
+def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
+    if ts is not None:
+        os.utime(filename, (ts, ts))
+    else:
+        os.utime(filename, None)
+
+
 def convert_file_timestamp_to_datetime(
     filename: str, producer
 ) -> Optional[datetime.datetime]:
index 913146a313608398d902a03eef7fe824399cd6fc..b19bfe16726dd5995a9d8db52cc7be49b67c8201 100644 (file)
@@ -24,6 +24,7 @@ def single_keystroke_response(
     def _single_keystroke_response_internal(
         valid_responses: List[str], timeout_seconds=None
     ) -> str:
+        os_special_keystrokes = [3, 26]  # ^C, ^Z
         if timeout_seconds is not None:
             signal.signal(signal.SIGALRM, _handle_timeout)
             signal.alarm(timeout_seconds)
@@ -33,6 +34,8 @@ def single_keystroke_response(
                 response = readchar.readchar()
                 if response in valid_responses:
                     break
+                if ord(response) in os_special_keystrokes:
+                    break
             return response
         finally:
             if timeout_seconds is not None:
index 22735c90c87f585bfe115521786eda97ef887a49..7804d86b4744be0ee85ec1a5d0c6f113dd7030d2 100644 (file)
@@ -222,8 +222,7 @@ class TrainingBlueprint(ABC):
                 try:
                     (key, value) = line.split(self.spec.key_value_delimiter)
                 except Exception as e:
-                    logger.exception(e)
-                    print(f"WARNING: bad line '{line}', skipped")
+                    logger.debug(f"WARNING: bad line in file {filename} '{line}', skipped")
                     continue
 
                 key = key.strip()
index 1c359828003110d4358d45bd8e5a825f631569c2..5a112db3d22e7dfc5d839306f4b78ba0648ae7fe 100644 (file)
@@ -1,13 +1,15 @@
 #!/usr/bin/env python3
 
 import glob
+import logging
 import os
-from typing import Callable, List, NamedTuple, Set
+from typing import Callable, List, NamedTuple, Optional, Set
 
 import argparse_utils
 import config
 import input_utils
 
+logger = logging.getLogger(__name__)
 parser = config.add_commandline_args(
     f"ML Quick Labeler ({__file__})",
     "Args related to quick labeling of ML training data",
@@ -17,12 +19,25 @@ parser.add_argument(
     default="./qlabel_skip_list.txt",
     metavar="FILENAME",
     type=argparse_utils.valid_filename,
-    help="Path to file in which to store already labeled data",
+    help="Path to file in which to store already labeled data.",
+)
+parser.add_argument(
+    "--ml_quick_label_use_skip_lists",
+    default=True,
+    action=argparse_utils.ActionNoYes,
+    help='Should we use a skip list file to speed up execution?',
+)
+parser.add_argument(
+    "--ml_quick_label_overwrite_labels",
+    default=False,
+    action=argparse_utils.ActionNoYes,
+    help='Enable overwriting existing labels; default is to not relabel.',
 )
 
 
 class InputSpec(NamedTuple):
-    image_file_glob: str
+    image_file_glob: Optional[str]
+    image_file_prepopulated_list: Optional[List[str]]
     image_file_to_features_file: Callable[[str], str]
     label: str
     valid_keystrokes: List[str]
@@ -32,60 +47,76 @@ class InputSpec(NamedTuple):
 
 def read_skip_list() -> Set[str]:
     ret: Set[str] = set()
-    quick_skip_file = config.config['ml_quick_label_skip_list_path']
-    if not os.path.exists(quick_skip_file):
-        return ret
-    with open(quick_skip_file, 'r') as f:
-        lines = f.readlines()
-    for line in lines:
-        line = line[:-1]
-        line.strip()
-        ret.add(line)
+    if config.config['ml_quick_label_use_skip_lists']:
+        quick_skip_file = config.config['ml_quick_label_skip_list_path']
+        if os.path.exists(quick_skip_file):
+            with open(quick_skip_file, 'r') as f:
+                lines = f.readlines()
+            for line in lines:
+                line = line[:-1]
+                line.strip()
+                ret.add(line)
+        logger.debug(f'Read {quick_skip_file} and found {len(ret)} entries.')
     return ret
 
 
 def write_skip_list(skip_list) -> None:
-    quick_skip_file = config.config['ml_quick_label_skip_list_path']
-    with open(quick_skip_file, 'w') as f:
-        for filename in skip_list:
-            filename = filename.strip()
-            if len(filename) > 0:
-                f.write(f'{filename}\n')
+    if config.config['ml_quick_label_use_skip_lists']:
+        quick_skip_file = config.config['ml_quick_label_skip_list_path']
+        with open(quick_skip_file, 'w') as f:
+            for filename in skip_list:
+                filename = filename.strip()
+                if len(filename) > 0:
+                    f.write(f'{filename}\n')
+        logger.debug(f'Updated {quick_skip_file}')
 
 
 def label(in_spec: InputSpec) -> None:
-    images = glob.glob(in_spec.image_file_glob)
+    images = []
+    if in_spec.image_file_glob is not None:
+        images += glob.glob(in_spec.image_file_glob)
+    elif in_spec.image_file_prepopulated_list is not None:
+        images += in_spec.image_file_prepopulated_list
+    else:
+        raise ValueError(
+            'One of image_file_glob or image_file_prepopulated_list is required'
+        )
 
     skip_list = read_skip_list()
     for image in images:
         if image in skip_list:
+            logger.debug(f'Skipping {image} because of the skip list')
             continue
         features = in_spec.image_file_to_features_file(image)
         if features is None or not os.path.exists(features):
+            logger.warning(
+                f'File {image} yielded file {features} which does not exist, SKIPPING.'
+            )
             continue
 
         # Render features and image.
+        filtered_lines = []
         with open(features, "r") as f:
             lines = f.readlines()
-        skip = False
+        saw_label = False
         for line in lines:
             line = line[:-1]
-            if in_spec.label in line:
-                skip = True
-        if skip:
+            if in_spec.label not in line:
+                filtered_lines.append(line)
+            else:
+                saw_label = True
+
+        if not saw_label or config.config['ml_quick_label_overwrite_labels']:
+            logger.info(features)
+            os.system(f'xv {image} &')
+            keystroke = input_utils.single_keystroke_response(
+                in_spec.valid_keystrokes,
+                prompt=in_spec.prompt,
+            )
+            os.system('killall xv')
+            label_value = in_spec.keystroke_to_label(keystroke)
+            filtered_lines.append(f"{in_spec.label}: {label_value}\n")
+            with open(features, 'w') as f:
+                f.writelines("%s\n" % line for line in filtered_lines)
             skip_list.add(image)
-            continue
-
-        os.system(f'xv {image} &')
-        keystroke = input_utils.single_keystroke_response(
-            in_spec.valid_keystrokes,
-            prompt=in_spec.prompt,
-        )
-        os.system('killall xv')
-
-        label_value = in_spec.keystroke_to_label(keystroke)
-        with open(features, "a") as f:
-            f.write(f"{in_spec.label}: {label_value}\n")
-        skip_list.add(image)
-
     write_skip_list(skip_list)
index b586ae1a7e82d62e92ba567b20e5a440254fe8b3..83575ff47ce878a93f5237565e066abac57a0b1a 100644 (file)
@@ -1,5 +1,6 @@
 #!/usr/bin/env python3
 
+from itertools import zip_longest
 import json
 import random
 import re
@@ -220,6 +221,33 @@ def strip_escape_sequences(in_str: str) -> str:
     return in_str
 
 
+def add_thousands_separator(in_str: str, *, separator_char = ',', places = 3) -> str:
+    if isinstance(in_str, int):
+        in_str = f'{in_str}'
+
+    if is_number(in_str):
+        return _add_thousands_separator(
+            in_str,
+            separator_char = separator_char,
+            places = places
+        )
+    raise ValueError(in_str)
+
+
+def _add_thousands_separator(in_str: str, *, separator_char = ',', places = 3) -> str:
+    decimal_part = ""
+    if '.' in in_str:
+        (in_str, decimal_part) = in_str.split('.')
+    tmp = [iter(in_str[::-1])] * places
+    ret = separator_char.join(
+        "".join(x) for x in zip_longest(*tmp, fillvalue=""))[::-1]
+    if len(decimal_part) > 0:
+        ret += '.'
+        ret += decimal_part
+    return ret
+
+
+
 # Full url example:
 # scheme://username:[email protected]:8042/folder/subfolder/file.extension?param=value&param2=value2#hash
 def is_url(in_str: Any, allowed_schemes: Optional[List[str]] = None) -> bool: