import datetime
import errno
+import hashlib
import logging
import os
import time
def does_file_exist(filename: str) -> bool:
- return os.path.exists(filename)
+ return os.path.exists(filename) and os.path.isfile(filename)
+
+
+def does_directory_exist(dirname: str) -> bool:
+ return os.path.exists(dirname) and os.path.isdir(dirname)
+
+
+def does_path_exist(pathname: str) -> bool:
+ return os.path.exists(pathname)
+
+
+def get_file_size(filename: str) -> int:
+ return os.path.getsize(filename)
+
+
+def is_normal_file(filename: str) -> bool:
+ return os.path.isfile(filename)
+
+
+def is_directory(filename: str) -> bool:
+ return os.path.isdir(filename)
+
+
+def is_symlink(filename: str) -> bool:
+ return os.path.islink(filename)
+
+
+def is_same_file(file1: str, file2: str) -> bool:
+ return os.path.samefile(file1, file2)
def get_file_raw_timestamps(filename: str) -> Optional[os.stat_result]:
return get_file_raw_timestamp(filename, lambda x: x.st_ctime)
+def get_file_md5(filename: str) -> str:
+ file_hash = hashlib.md5()
+ with open(filename, "rb") as f:
+ chunk = f.read(8192)
+ while chunk:
+ file_hash.update(chunk)
+ chunk = f.read(8192)
+ return file_hash.hexdigest()
+
+
+def set_file_raw_atime(filename: str, atime: float):
+ mtime = get_file_raw_mtime(filename)
+ os.utime(filename, (atime, mtime))
+
+
+def set_file_raw_mtime(filename: str, mtime: float):
+ atime = get_file_raw_atime(filename)
+ os.utime(filename, (atime, mtime))
+
+
+def set_file_raw_atime_and_mtime(filename: str, ts: float = None):
+ if ts is not None:
+ os.utime(filename, (ts, ts))
+ else:
+ os.utime(filename, None)
+
+
def convert_file_timestamp_to_datetime(
filename: str, producer
) -> Optional[datetime.datetime]:
#!/usr/bin/env python3
import glob
+import logging
import os
-from typing import Callable, List, NamedTuple, Set
+from typing import Callable, List, NamedTuple, Optional, Set
import argparse_utils
import config
import input_utils
+logger = logging.getLogger(__name__)
parser = config.add_commandline_args(
f"ML Quick Labeler ({__file__})",
"Args related to quick labeling of ML training data",
default="./qlabel_skip_list.txt",
metavar="FILENAME",
type=argparse_utils.valid_filename,
- help="Path to file in which to store already labeled data",
+ help="Path to file in which to store already labeled data.",
+)
+parser.add_argument(
+ "--ml_quick_label_use_skip_lists",
+ default=True,
+ action=argparse_utils.ActionNoYes,
+ help='Should we use a skip list file to speed up execution?',
+)
+parser.add_argument(
+ "--ml_quick_label_overwrite_labels",
+ default=False,
+ action=argparse_utils.ActionNoYes,
+ help='Enable overwriting existing labels; default is to not relabel.',
)
class InputSpec(NamedTuple):
- image_file_glob: str
+ image_file_glob: Optional[str]
+ image_file_prepopulated_list: Optional[List[str]]
image_file_to_features_file: Callable[[str], str]
label: str
valid_keystrokes: List[str]
def read_skip_list() -> Set[str]:
ret: Set[str] = set()
- quick_skip_file = config.config['ml_quick_label_skip_list_path']
- if not os.path.exists(quick_skip_file):
- return ret
- with open(quick_skip_file, 'r') as f:
- lines = f.readlines()
- for line in lines:
- line = line[:-1]
- line.strip()
- ret.add(line)
+ if config.config['ml_quick_label_use_skip_lists']:
+ quick_skip_file = config.config['ml_quick_label_skip_list_path']
+ if os.path.exists(quick_skip_file):
+ with open(quick_skip_file, 'r') as f:
+ lines = f.readlines()
+ for line in lines:
+ line = line[:-1]
+ line.strip()
+ ret.add(line)
+ logger.debug(f'Read {quick_skip_file} and found {len(ret)} entries.')
return ret
def write_skip_list(skip_list) -> None:
- quick_skip_file = config.config['ml_quick_label_skip_list_path']
- with open(quick_skip_file, 'w') as f:
- for filename in skip_list:
- filename = filename.strip()
- if len(filename) > 0:
- f.write(f'{filename}\n')
+ if config.config['ml_quick_label_use_skip_lists']:
+ quick_skip_file = config.config['ml_quick_label_skip_list_path']
+ with open(quick_skip_file, 'w') as f:
+ for filename in skip_list:
+ filename = filename.strip()
+ if len(filename) > 0:
+ f.write(f'{filename}\n')
+ logger.debug(f'Updated {quick_skip_file}')
def label(in_spec: InputSpec) -> None:
- images = glob.glob(in_spec.image_file_glob)
+ images = []
+ if in_spec.image_file_glob is not None:
+ images += glob.glob(in_spec.image_file_glob)
+ elif in_spec.image_file_prepopulated_list is not None:
+ images += in_spec.image_file_prepopulated_list
+ else:
+ raise ValueError(
+ 'One of image_file_glob or image_file_prepopulated_list is required'
+ )
skip_list = read_skip_list()
for image in images:
if image in skip_list:
+ logger.debug(f'Skipping {image} because of the skip list')
continue
features = in_spec.image_file_to_features_file(image)
if features is None or not os.path.exists(features):
+ logger.warning(
+ f'File {image} yielded file {features} which does not exist, SKIPPING.'
+ )
continue
# Render features and image.
+ filtered_lines = []
with open(features, "r") as f:
lines = f.readlines()
- skip = False
+ saw_label = False
for line in lines:
line = line[:-1]
- if in_spec.label in line:
- skip = True
- if skip:
+ if in_spec.label not in line:
+ filtered_lines.append(line)
+ else:
+ saw_label = True
+
+ if not saw_label or config.config['ml_quick_label_overwrite_labels']:
+ logger.info(features)
+ os.system(f'xv {image} &')
+ keystroke = input_utils.single_keystroke_response(
+ in_spec.valid_keystrokes,
+ prompt=in_spec.prompt,
+ )
+ os.system('killall xv')
+ label_value = in_spec.keystroke_to_label(keystroke)
+ filtered_lines.append(f"{in_spec.label}: {label_value}\n")
+ with open(features, 'w') as f:
+ f.writelines("%s\n" % line for line in filtered_lines)
skip_list.add(image)
- continue
-
- os.system(f'xv {image} &')
- keystroke = input_utils.single_keystroke_response(
- in_spec.valid_keystrokes,
- prompt=in_spec.prompt,
- )
- os.system('killall xv')
-
- label_value = in_spec.keystroke_to_label(keystroke)
- with open(features, "a") as f:
- f.write(f"{in_spec.label}: {label_value}\n")
- skip_list.add(image)
-
write_skip_list(skip_list)
#!/usr/bin/env python3
+from itertools import zip_longest
import json
import random
import re
return in_str
+def add_thousands_separator(in_str: str, *, separator_char = ',', places = 3) -> str:
+ if isinstance(in_str, int):
+ in_str = f'{in_str}'
+
+ if is_number(in_str):
+ return _add_thousands_separator(
+ in_str,
+ separator_char = separator_char,
+ places = places
+ )
+ raise ValueError(in_str)
+
+
+def _add_thousands_separator(in_str: str, *, separator_char = ',', places = 3) -> str:
+ decimal_part = ""
+ if '.' in in_str:
+ (in_str, decimal_part) = in_str.split('.')
+ tmp = [iter(in_str[::-1])] * places
+ ret = separator_char.join(
+ "".join(x) for x in zip_longest(*tmp, fillvalue=""))[::-1]
+ if len(decimal_part) > 0:
+ ret += '.'
+ ret += decimal_part
+ return ret
+
+
+
# Full url example:
# scheme://username:password@www.domain.com:8042/folder/subfolder/file.extension?param=value¶m2=value2#hash
def is_url(in_str: Any, allowed_schemes: Optional[List[str]] = None) -> bool: